mirror of
https://github.com/openai/codex.git
synced 2026-04-30 01:16:54 +00:00
codex-rs: fix thread resume rejoin semantics (#11756)
## Summary - always rejoin an in-memory running thread on `thread/resume`, even when overrides are present - reject `thread/resume` when `history` is provided for a running thread - reject `thread/resume` when `path` mismatches the running thread rollout path - warn (but do not fail) on override mismatches for running threads - add more `thread_resume` integration tests and fixes; including restart-based resume-with-overrides coverage ## Validation - `just fmt` - `cargo test -p codex-app-server --test all thread_resume` - manual test with app-server-test-client https://github.com/openai/codex/pull/11755 - manual test both stdio and websocket in app
This commit is contained in:
@@ -2689,6 +2689,13 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
async fn thread_resume(&mut self, request_id: ConnectionRequestId, params: ThreadResumeParams) {
|
||||
if self
|
||||
.resume_running_thread(request_id.clone(), ¶ms)
|
||||
.await
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let ThreadResumeParams {
|
||||
thread_id,
|
||||
history,
|
||||
@@ -2706,77 +2713,21 @@ impl CodexMessageProcessor {
|
||||
} = params;
|
||||
|
||||
let thread_history = if let Some(history) = history {
|
||||
if history.is_empty() {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"history must not be empty".to_string(),
|
||||
)
|
||||
.await;
|
||||
let Some(thread_history) = self
|
||||
.resume_thread_from_history(request_id.clone(), history.as_slice())
|
||||
.await
|
||||
else {
|
||||
return;
|
||||
}
|
||||
InitialHistory::Forked(history.into_iter().map(RolloutItem::ResponseItem).collect())
|
||||
} else if let Some(path) = path {
|
||||
match RolloutRecorder::get_rollout_history(&path).await {
|
||||
Ok(initial_history) => initial_history,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to load rollout `{}`: {err}", path.display()),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
thread_history
|
||||
} else {
|
||||
let existing_thread_id = match ThreadId::from_string(&thread_id) {
|
||||
Ok(id) => id,
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("invalid thread id: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
let Some(thread_history) = self
|
||||
.resume_thread_from_rollout(request_id.clone(), &thread_id, path.as_ref())
|
||||
.await
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
let path = match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&existing_thread_id.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(p)) => p,
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("no rollout found for thread id {existing_thread_id}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {existing_thread_id}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match RolloutRecorder::get_rollout_history(&path).await {
|
||||
Ok(initial_history) => initial_history,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to load rollout `{}`: {err}", path.display()),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
thread_history
|
||||
};
|
||||
|
||||
let history_cwd = thread_history.session_cwd();
|
||||
@@ -2857,41 +2808,17 @@ impl CodexMessageProcessor {
|
||||
);
|
||||
}
|
||||
|
||||
let mut thread = match read_summary_from_rollout(
|
||||
rollout_path.as_path(),
|
||||
fallback_model_provider.as_str(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(summary) => summary_to_thread(summary),
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!(
|
||||
"failed to load rollout `{}` for thread {thread_id}: {err}",
|
||||
rollout_path.display()
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
let Some(thread) = self
|
||||
.load_thread_from_rollout_or_send_internal(
|
||||
request_id.clone(),
|
||||
thread_id,
|
||||
rollout_path.as_path(),
|
||||
fallback_model_provider.as_str(),
|
||||
)
|
||||
.await
|
||||
else {
|
||||
return;
|
||||
};
|
||||
match read_rollout_items_from_rollout(rollout_path.as_path()).await {
|
||||
Ok(items) => {
|
||||
thread.turns = build_turns_from_rollout_items(&items);
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!(
|
||||
"failed to load rollout `{}` for thread {thread_id}: {err}",
|
||||
rollout_path.display()
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let response = ThreadResumeResponse {
|
||||
thread,
|
||||
@@ -2916,6 +2843,278 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn resume_running_thread(
|
||||
&mut self,
|
||||
request_id: ConnectionRequestId,
|
||||
params: &ThreadResumeParams,
|
||||
) -> bool {
|
||||
if let Ok(existing_thread_id) = ThreadId::from_string(¶ms.thread_id)
|
||||
&& let Ok(existing_thread) = self.thread_manager.get_thread(existing_thread_id).await
|
||||
{
|
||||
if params.history.is_some() {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!(
|
||||
"cannot resume thread {existing_thread_id} with history while it is already running"
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return true;
|
||||
}
|
||||
|
||||
let rollout_path = if let Some(path) = existing_thread.rollout_path() {
|
||||
if path.exists() {
|
||||
path
|
||||
} else {
|
||||
match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&existing_thread_id.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(path)) => path,
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("no rollout found for thread id {existing_thread_id}"),
|
||||
)
|
||||
.await;
|
||||
return true;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {existing_thread_id}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&existing_thread_id.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(path)) => path,
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("no rollout found for thread id {existing_thread_id}"),
|
||||
)
|
||||
.await;
|
||||
return true;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {existing_thread_id}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(requested_path) = params.path.as_ref()
|
||||
&& requested_path != &rollout_path
|
||||
{
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!(
|
||||
"cannot resume running thread {existing_thread_id} with mismatched path: requested `{}`, active `{}`",
|
||||
requested_path.display(),
|
||||
rollout_path.display()
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return true;
|
||||
}
|
||||
|
||||
if let Err(err) = self
|
||||
.ensure_conversation_listener(
|
||||
existing_thread_id,
|
||||
request_id.connection_id,
|
||||
false,
|
||||
ApiVersion::V2,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(
|
||||
"failed to attach listener for thread {}: {}",
|
||||
existing_thread_id,
|
||||
err.message
|
||||
);
|
||||
}
|
||||
|
||||
let config_snapshot = existing_thread.config_snapshot().await;
|
||||
let mismatch_details = collect_resume_override_mismatches(params, &config_snapshot);
|
||||
if !mismatch_details.is_empty() {
|
||||
tracing::warn!(
|
||||
"thread/resume overrides ignored for running thread {}: {}",
|
||||
existing_thread_id,
|
||||
mismatch_details.join("; ")
|
||||
);
|
||||
}
|
||||
|
||||
let Some(thread) = self
|
||||
.load_thread_from_rollout_or_send_internal(
|
||||
request_id.clone(),
|
||||
existing_thread_id,
|
||||
rollout_path.as_path(),
|
||||
config_snapshot.model_provider_id.as_str(),
|
||||
)
|
||||
.await
|
||||
else {
|
||||
return true;
|
||||
};
|
||||
|
||||
let ThreadConfigSnapshot {
|
||||
model,
|
||||
model_provider_id,
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
cwd,
|
||||
reasoning_effort,
|
||||
..
|
||||
} = config_snapshot;
|
||||
let response = ThreadResumeResponse {
|
||||
thread,
|
||||
model,
|
||||
model_provider: model_provider_id,
|
||||
cwd,
|
||||
approval_policy: approval_policy.into(),
|
||||
sandbox: sandbox_policy.into(),
|
||||
reasoning_effort,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
return true;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
async fn resume_thread_from_history(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
history: &[ResponseItem],
|
||||
) -> Option<InitialHistory> {
|
||||
if history.is_empty() {
|
||||
self.send_invalid_request_error(request_id, "history must not be empty".to_string())
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
Some(InitialHistory::Forked(
|
||||
history
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(RolloutItem::ResponseItem)
|
||||
.collect(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn resume_thread_from_rollout(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
thread_id: &str,
|
||||
path: Option<&PathBuf>,
|
||||
) -> Option<InitialHistory> {
|
||||
let rollout_path = if let Some(path) = path {
|
||||
path.clone()
|
||||
} else {
|
||||
let existing_thread_id = match ThreadId::from_string(thread_id) {
|
||||
Ok(id) => id,
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("invalid thread id: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&existing_thread_id.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(path)) => path,
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("no rollout found for thread id {existing_thread_id}"),
|
||||
)
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {existing_thread_id}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match RolloutRecorder::get_rollout_history(&rollout_path).await {
|
||||
Ok(initial_history) => Some(initial_history),
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to load rollout `{}`: {err}", rollout_path.display()),
|
||||
)
|
||||
.await;
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn load_thread_from_rollout_or_send_internal(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
thread_id: ThreadId,
|
||||
rollout_path: &Path,
|
||||
fallback_provider: &str,
|
||||
) -> Option<Thread> {
|
||||
let mut thread = match read_summary_from_rollout(rollout_path, fallback_provider).await {
|
||||
Ok(summary) => summary_to_thread(summary),
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!(
|
||||
"failed to load rollout `{}` for thread {thread_id}: {err}",
|
||||
rollout_path.display()
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
};
|
||||
match read_rollout_items_from_rollout(rollout_path).await {
|
||||
Ok(items) => {
|
||||
thread.turns = build_turns_from_rollout_items(&items);
|
||||
Some(thread)
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!(
|
||||
"failed to load rollout `{}` for thread {thread_id}: {err}",
|
||||
rollout_path.display()
|
||||
),
|
||||
)
|
||||
.await;
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_fork(&mut self, request_id: ConnectionRequestId, params: ThreadForkParams) {
|
||||
let ThreadForkParams {
|
||||
thread_id,
|
||||
@@ -5808,6 +6007,101 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
fn collect_resume_override_mismatches(
|
||||
request: &ThreadResumeParams,
|
||||
config_snapshot: &ThreadConfigSnapshot,
|
||||
) -> Vec<String> {
|
||||
let mut mismatch_details = Vec::new();
|
||||
|
||||
if let Some(requested_model) = request.model.as_deref()
|
||||
&& requested_model != config_snapshot.model
|
||||
{
|
||||
mismatch_details.push(format!(
|
||||
"model requested={requested_model} active={}",
|
||||
config_snapshot.model
|
||||
));
|
||||
}
|
||||
if let Some(requested_provider) = request.model_provider.as_deref()
|
||||
&& requested_provider != config_snapshot.model_provider_id
|
||||
{
|
||||
mismatch_details.push(format!(
|
||||
"model_provider requested={requested_provider} active={}",
|
||||
config_snapshot.model_provider_id
|
||||
));
|
||||
}
|
||||
if let Some(requested_cwd) = request.cwd.as_deref() {
|
||||
let requested_cwd_path = std::path::PathBuf::from(requested_cwd);
|
||||
if requested_cwd_path != config_snapshot.cwd {
|
||||
mismatch_details.push(format!(
|
||||
"cwd requested={} active={}",
|
||||
requested_cwd_path.display(),
|
||||
config_snapshot.cwd.display()
|
||||
));
|
||||
}
|
||||
}
|
||||
if let Some(requested_approval) = request.approval_policy.as_ref() {
|
||||
let active_approval: AskForApproval = config_snapshot.approval_policy.into();
|
||||
if requested_approval != &active_approval {
|
||||
mismatch_details.push(format!(
|
||||
"approval_policy requested={requested_approval:?} active={active_approval:?}"
|
||||
));
|
||||
}
|
||||
}
|
||||
if let Some(requested_sandbox) = request.sandbox.as_ref() {
|
||||
let sandbox_matches = matches!(
|
||||
(requested_sandbox, &config_snapshot.sandbox_policy),
|
||||
(
|
||||
SandboxMode::ReadOnly,
|
||||
codex_protocol::protocol::SandboxPolicy::ReadOnly { .. }
|
||||
) | (
|
||||
SandboxMode::WorkspaceWrite,
|
||||
codex_protocol::protocol::SandboxPolicy::WorkspaceWrite { .. }
|
||||
) | (
|
||||
SandboxMode::DangerFullAccess,
|
||||
codex_protocol::protocol::SandboxPolicy::DangerFullAccess
|
||||
) | (
|
||||
SandboxMode::DangerFullAccess,
|
||||
codex_protocol::protocol::SandboxPolicy::ExternalSandbox { .. }
|
||||
)
|
||||
);
|
||||
if !sandbox_matches {
|
||||
mismatch_details.push(format!(
|
||||
"sandbox requested={requested_sandbox:?} active={:?}",
|
||||
config_snapshot.sandbox_policy
|
||||
));
|
||||
}
|
||||
}
|
||||
if let Some(requested_personality) = request.personality.as_ref()
|
||||
&& config_snapshot.personality.as_ref() != Some(requested_personality)
|
||||
{
|
||||
mismatch_details.push(format!(
|
||||
"personality requested={requested_personality:?} active={:?}",
|
||||
config_snapshot.personality
|
||||
));
|
||||
}
|
||||
|
||||
if request.config.is_some() {
|
||||
mismatch_details
|
||||
.push("config overrides were provided and ignored while running".to_string());
|
||||
}
|
||||
if request.base_instructions.is_some() {
|
||||
mismatch_details
|
||||
.push("baseInstructions override was provided and ignored while running".to_string());
|
||||
}
|
||||
if request.developer_instructions.is_some() {
|
||||
mismatch_details.push(
|
||||
"developerInstructions override was provided and ignored while running".to_string(),
|
||||
);
|
||||
}
|
||||
if request.persist_extended_history {
|
||||
mismatch_details.push(
|
||||
"persistExtendedHistory override was provided and ignored while running".to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
mismatch_details
|
||||
}
|
||||
|
||||
fn skills_to_info(
|
||||
skills: &[codex_core::skills::SkillMetadata],
|
||||
disabled_paths: &std::collections::HashSet<PathBuf>,
|
||||
|
||||
Reference in New Issue
Block a user