[app-server] feat: thread/resume supports history, path, and overrides (#6483)

This updates `thread/resume` to be at parity with v1's
`ResumeConversationParams`. Turns out history is useful for codex cloud
and path is useful for the VSCode extension. And config overrides are
always useful.
This commit is contained in:
Owen Lin
2025-11-12 14:02:43 -08:00
committed by GitHub
parent 2f58e69997
commit 964220ac94
4 changed files with 339 additions and 96 deletions

View File

@@ -8,6 +8,7 @@ use codex_protocol::config_types::ReasoningEffort;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::items::AgentMessageContent as CoreAgentMessageContent;
use codex_protocol::items::TurnItem as CoreTurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot;
use codex_protocol::protocol::RateLimitWindow as CoreRateLimitWindow;
use codex_protocol::user_input::UserInput as CoreUserInput;
@@ -290,11 +291,39 @@ pub struct ThreadStartResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
/// There are three ways to resume a thread:
/// 1. By thread_id: load the thread from disk by thread_id and resume it.
/// 2. By history: instantiate the thread from memory and resume it.
/// 3. By path: load the thread from disk by path and resume it.
///
/// The precedence is: history > path > thread_id.
/// If using history or path, the thread_id param will be ignored.
///
/// Prefer using thread_id whenever possible.
pub struct ThreadResumeParams {
pub thread_id: String,
/// [UNSTABLE] FOR CODEX CLOUD - DO NOT USE.
/// If specified, the thread will be resumed with the provided history
/// instead of loaded from disk.
pub history: Option<Vec<ResponseItem>>,
/// [UNSTABLE] Specify the rollout path to resume from.
/// If specified, the thread_id param will be ignored.
pub path: Option<PathBuf>,
/// Configuration overrides for the resumed thread, if any.
pub model: Option<String>,
pub model_provider: Option<String>,
pub cwd: Option<String>,
pub approval_policy: Option<AskForApproval>,
pub sandbox: Option<SandboxMode>,
pub config: Option<HashMap<String, serde_json::Value>>,
pub base_instructions: Option<String>,
pub developer_instructions: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
@@ -361,6 +390,8 @@ pub struct Thread {
pub model_provider: String,
/// Unix timestamp (in seconds) when the thread was created.
pub created_at: i64,
/// [UNSTABLE] Path to the thread on disk.
pub path: PathBuf,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]

View File

@@ -1201,21 +1201,17 @@ impl CodexMessageProcessor {
}
async fn thread_start(&mut self, request_id: RequestId, params: ThreadStartParams) {
// Build ConfigOverrides directly from ThreadStartParams for config derivation.
let cli_overrides = params.config;
let overrides = ConfigOverrides {
model: params.model,
cwd: params.cwd.map(PathBuf::from),
approval_policy: params.approval_policy.map(AskForApproval::to_core),
sandbox_mode: params.sandbox.map(SandboxMode::to_core),
model_provider: params.model_provider,
codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(),
base_instructions: params.base_instructions,
developer_instructions: params.developer_instructions,
..Default::default()
};
let overrides = self.build_thread_config_overrides(
params.model,
params.model_provider,
params.cwd,
params.approval_policy,
params.sandbox,
params.base_instructions,
params.developer_instructions,
);
let config = match derive_config_from_params(overrides, cli_overrides).await {
let config = match derive_config_from_params(overrides, params.config).await {
Ok(config) => config,
Err(err) => {
let error = JSONRPCErrorError {
@@ -1244,16 +1240,15 @@ impl CodexMessageProcessor {
{
Ok(summary) => summary_to_thread(summary),
Err(err) => {
warn!(
"failed to load summary for new thread {}: {}",
conversation_id, err
);
Thread {
id: conversation_id.to_string(),
preview: String::new(),
model_provider: self.config.model_provider_id.clone(),
created_at: chrono::Utc::now().timestamp(),
}
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for conversation {conversation_id}: {err}",
rollout_path.display()
),
)
.await;
return;
}
};
@@ -1292,6 +1287,31 @@ impl CodexMessageProcessor {
}
}
#[allow(clippy::too_many_arguments)]
fn build_thread_config_overrides(
&self,
model: Option<String>,
model_provider: Option<String>,
cwd: Option<String>,
approval_policy: Option<codex_app_server_protocol::AskForApproval>,
sandbox: Option<SandboxMode>,
base_instructions: Option<String>,
developer_instructions: Option<String>,
) -> ConfigOverrides {
ConfigOverrides {
model,
model_provider,
cwd: cwd.map(PathBuf::from),
approval_policy: approval_policy
.map(codex_app_server_protocol::AskForApproval::to_core),
sandbox_mode: sandbox.map(SandboxMode::to_core),
codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(),
base_instructions,
developer_instructions,
..Default::default()
}
}
async fn thread_archive(&mut self, request_id: RequestId, params: ThreadArchiveParams) {
let conversation_id = match ConversationId::from_string(&params.thread_id) {
Ok(id) => id,
@@ -1374,88 +1394,147 @@ impl CodexMessageProcessor {
}
async fn thread_resume(&mut self, request_id: RequestId, params: ThreadResumeParams) {
let conversation_id = match ConversationId::from_string(&params.thread_id) {
Ok(id) => id,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid thread id: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
let ThreadResumeParams {
thread_id,
history,
path,
model,
model_provider,
cwd,
approval_policy,
sandbox,
config: cli_overrides,
base_instructions,
developer_instructions,
} = params;
let overrides_requested = model.is_some()
|| model_provider.is_some()
|| cwd.is_some()
|| approval_policy.is_some()
|| sandbox.is_some()
|| cli_overrides.is_some()
|| base_instructions.is_some()
|| developer_instructions.is_some();
let config = if overrides_requested {
let overrides = self.build_thread_config_overrides(
model,
model_provider,
cwd,
approval_policy,
sandbox,
base_instructions,
developer_instructions,
);
match derive_config_from_params(overrides, cli_overrides).await {
Ok(config) => config,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("error deriving config: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
}
} else {
self.config.as_ref().clone()
};
let conversation_history = if let Some(history) = history {
if history.is_empty() {
self.send_invalid_request_error(
request_id,
"history must not be empty".to_string(),
)
.await;
return;
}
InitialHistory::Forked(history.into_iter().map(RolloutItem::ResponseItem).collect())
} else if let Some(path) = path {
match RolloutRecorder::get_rollout_history(&path).await {
Ok(initial_history) => initial_history,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to load rollout `{}`: {err}", path.display()),
)
.await;
return;
}
}
} else {
let existing_conversation_id = match ConversationId::from_string(&thread_id) {
Ok(id) => id,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid thread id: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let path = match find_conversation_path_by_id_str(
&self.config.codex_home,
&existing_conversation_id.to_string(),
)
.await
{
Ok(Some(p)) => p,
Ok(None) => {
self.send_invalid_request_error(
request_id,
format!("no rollout found for conversation id {existing_conversation_id}"),
)
.await;
return;
}
Err(err) => {
self.send_invalid_request_error(
request_id,
format!(
"failed to locate conversation id {existing_conversation_id}: {err}"
),
)
.await;
return;
}
};
match RolloutRecorder::get_rollout_history(&path).await {
Ok(initial_history) => initial_history,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to load rollout `{}`: {err}", path.display()),
)
.await;
return;
}
}
};
let path = match find_conversation_path_by_id_str(
&self.config.codex_home,
&conversation_id.to_string(),
)
.await
{
Ok(Some(p)) => p,
Ok(None) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("no rollout found for conversation id {conversation_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("failed to locate conversation id {conversation_id}: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let fallback_provider = self.config.model_provider_id.as_str();
let summary = match read_summary_from_rollout(&path, fallback_provider).await {
Ok(s) => s,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("failed to load rollout `{}`: {err}", path.display()),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let initial_history = match RolloutRecorder::get_rollout_history(&summary.path).await {
Ok(initial_history) => initial_history,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"failed to load rollout `{}` for conversation {conversation_id}: {err}",
summary.path.display()
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let fallback_model_provider = config.model_provider_id.clone();
match self
.conversation_manager
.resume_conversation_with_history(
self.config.as_ref().clone(),
initial_history,
config,
conversation_history,
self.auth_manager.clone(),
)
.await
{
Ok(_) => {
let thread = summary_to_thread(summary);
Ok(NewConversation {
conversation_id,
session_configured,
..
}) => {
// Auto-attach a conversation listener when resuming a thread.
if let Err(err) = self
.attach_conversation_listener(conversation_id, false)
@@ -1468,6 +1547,25 @@ impl CodexMessageProcessor {
);
}
let thread = match read_summary_from_rollout(
session_configured.rollout_path.as_path(),
fallback_model_provider.as_str(),
)
.await
{
Ok(summary) => summary_to_thread(summary),
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for conversation {conversation_id}: {err}",
session_configured.rollout_path.display()
),
)
.await;
return;
}
};
let response = ThreadResumeResponse { thread };
self.outgoing.send_response(request_id, response).await;
}
@@ -1878,6 +1976,15 @@ impl CodexMessageProcessor {
self.outgoing.send_error(request_id, error).await;
}
async fn send_internal_error(&self, request_id: RequestId, message: String) {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message,
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
async fn archive_conversation(
&mut self,
request_id: RequestId,
@@ -2867,6 +2974,7 @@ fn parse_datetime(timestamp: Option<&str>) -> Option<DateTime<Utc>> {
fn summary_to_thread(summary: ConversationSummary) -> Thread {
let ConversationSummary {
conversation_id,
path,
preview,
timestamp,
model_provider,
@@ -2880,6 +2988,7 @@ fn summary_to_thread(summary: ConversationSummary) -> Thread {
preview,
model_provider,
created_at: created_at.map(|dt| dt.timestamp()).unwrap_or(0),
path,
}
}

View File

@@ -8,13 +8,15 @@ use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_resume_returns_existing_thread() -> Result<()> {
async fn thread_resume_returns_original_thread() -> Result<()> {
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
@@ -40,6 +42,7 @@ async fn thread_resume_returns_existing_thread() -> Result<()> {
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id.clone(),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
@@ -54,6 +57,105 @@ async fn thread_resume_returns_existing_thread() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_resume_prefers_path_over_thread_id() -> Result<()> {
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5-codex".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(start_resp)?;
let thread_path = thread.path.clone();
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: "not-a-valid-thread-id".to_string(),
path: Some(thread_path),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread: resumed } =
to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resumed, thread);
Ok(())
}
#[tokio::test]
async fn thread_resume_supports_history_and_overrides() -> Result<()> {
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a thread.
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5-codex".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(start_resp)?;
let history_text = "Hello from history";
let history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: history_text.to_string(),
}],
}];
// Resume with explicit history and override the model.
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id,
history: Some(history),
model: Some("mock-model".to_string()),
model_provider: Some("mock_provider".to_string()),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread: resumed } =
to_response::<ThreadResumeResponse>(resume_resp)?;
assert!(!resumed.id.is_empty());
assert_eq!(resumed.model_provider, "mock_provider");
assert_eq!(resumed.preview, history_text);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");

View File

@@ -301,6 +301,7 @@ async fn prompt_tools_are_consistent_across_requests() {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "flaky on ubuntu-24.04-arm - aarch64-unknown-linux-gpu"]
async fn prefixes_context_and_instructions_once_and_consistently_across_requests() {
skip_if_no_network!();
use pretty_assertions::assert_eq;