Import external agent sessions in background (#20284)

Summary:
- Return from external agent import before session history import
finishes
- Run session import work in the background and emit the existing
completion notification when it is done
- Serialize session imports so duplicate requests do not create
duplicate imported threads

Verification:
- cargo test -p codex-app-server external_agent_config_
- cargo test -p codex-external-agent-sessions
- just fix -p codex-app-server
- just fix -p codex-external-agent-sessions
- git diff --check
This commit is contained in:
stefanstokic-oai
2026-04-29 17:00:41 -07:00
committed by GitHub
parent 7bcd4626c4
commit c8abcbf925
8 changed files with 465 additions and 57 deletions

View File

@@ -26,6 +26,8 @@ use core_test_support::responses;
use pretty_assertions::assert_eq;
use std::collections::BTreeMap;
use tempfile::TempDir;
#[cfg(unix)]
use tokio::io::AsyncWriteExt;
use tokio::time::timeout;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
@@ -271,6 +273,12 @@ async fn external_agent_config_import_creates_session_rollouts() -> Result<()> {
.await??;
let response: ExternalAgentConfigImportResponse = to_response(response)?;
assert_eq!(response, ExternalAgentConfigImportResponse {});
let notification = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_notification_message("externalAgentConfig/import/completed"),
)
.await??;
assert_eq!(notification.method, "externalAgentConfig/import/completed");
let request_id = mcp
.send_thread_list_request(ThreadListParams {
@@ -380,6 +388,92 @@ async fn external_agent_config_import_creates_session_rollouts() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn external_agent_config_import_accepts_detected_session_payload_after_restart() -> Result<()>
{
let server = create_mock_responses_server_repeating_assistant("unused").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let project_root = codex_home.path().join("repo");
let recent_timestamp = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let session_dir = codex_home.path().join(".claude/projects/repo");
let session_path = session_dir.join("session.jsonl");
std::fs::create_dir_all(&project_root)?;
std::fs::create_dir_all(&session_dir)?;
std::fs::write(
&session_path,
serde_json::json!({
"type": "user",
"cwd": &project_root,
"timestamp": &recent_timestamp,
"message": { "content": "first request" },
})
.to_string(),
)?;
let home_dir = codex_home.path().display().to_string();
let mut mcp =
McpProcess::new_with_env(codex_home.path(), &[("HOME", Some(home_dir.as_str()))]).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_raw_request(
"externalAgentConfig/import",
Some(serde_json::json!({
"migrationItems": [{
"itemType": "SESSIONS",
"description": "Migrate recent sessions",
"cwd": null,
"details": {
"sessions": [{
"path": session_path,
"cwd": project_root,
"title": "first request"
}]
}
}]
})),
)
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: ExternalAgentConfigImportResponse = to_response(response)?;
assert_eq!(response, ExternalAgentConfigImportResponse {});
let notification = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_notification_message("externalAgentConfig/import/completed"),
)
.await??;
assert_eq!(notification.method, "externalAgentConfig/import/completed");
let request_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: None,
limit: None,
sort_key: None,
sort_direction: None,
model_providers: None,
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: None,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: ThreadListResponse = to_response(response)?;
assert_eq!(response.data.len(), 1);
Ok(())
}
#[tokio::test]
async fn external_agent_config_import_skips_already_imported_session_versions() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("unused").await;
@@ -433,6 +527,12 @@ async fn external_agent_config_import_skips_already_imported_session_versions()
)
.await??;
let _: ExternalAgentConfigImportResponse = to_response(response)?;
let notification = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_notification_message("externalAgentConfig/import/completed"),
)
.await??;
assert_eq!(notification.method, "externalAgentConfig/import/completed");
}
let request_id = mcp
@@ -460,6 +560,144 @@ async fn external_agent_config_import_skips_already_imported_session_versions()
Ok(())
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn external_agent_config_import_returns_before_background_session_import_finishes()
-> Result<()> {
let server = create_mock_responses_server_repeating_assistant("unused").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let project_root = codex_home.path().join("repo");
let recent_timestamp = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let session_dir = codex_home.path().join(".claude/projects/repo");
let session_path = session_dir.join("session.jsonl");
std::fs::create_dir_all(&project_root)?;
std::fs::create_dir_all(&session_dir)?;
std::fs::write(
&session_path,
serde_json::json!({
"type": "user",
"cwd": &project_root,
"timestamp": &recent_timestamp,
"message": { "content": "first request" },
})
.to_string(),
)?;
let project_config_dir = project_root.join(".codex");
std::fs::create_dir_all(&project_config_dir)?;
let project_config = project_config_dir.join("config.toml");
let status = std::process::Command::new("mkfifo")
.arg(&project_config)
.status()?;
assert!(status.success());
let home_dir = codex_home.path().display().to_string();
let mut mcp =
McpProcess::new_with_env(codex_home.path(), &[("HOME", Some(home_dir.as_str()))]).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_raw_request(
"externalAgentConfig/detect",
Some(serde_json::json!({ "includeHome": true })),
)
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let detected: ExternalAgentConfigDetectResponse = to_response(response)?;
assert_eq!(detected.items.len(), 1);
let detected_items = detected.items;
let request_id = mcp
.send_raw_request(
"externalAgentConfig/import",
Some(serde_json::json!({ "migrationItems": detected_items.clone() })),
)
.await?;
let response: JSONRPCResponse = timeout(
Duration::from_secs(5),
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: ExternalAgentConfigImportResponse = to_response(response)?;
assert_eq!(response, ExternalAgentConfigImportResponse {});
assert!(
timeout(
Duration::from_millis(200),
mcp.read_stream_until_notification_message("externalAgentConfig/import/completed")
)
.await
.is_err(),
"session import completed before the blocked background import was unblocked"
);
let duplicate_request_id = mcp
.send_raw_request(
"externalAgentConfig/import",
Some(serde_json::json!({ "migrationItems": detected_items })),
)
.await?;
let response: JSONRPCResponse = timeout(
Duration::from_secs(5),
mcp.read_stream_until_response_message(RequestId::Integer(duplicate_request_id)),
)
.await??;
let response: ExternalAgentConfigImportResponse = to_response(response)?;
assert_eq!(response, ExternalAgentConfigImportResponse {});
let writer = tokio::spawn(async move {
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.open(&project_config)
.await?;
file.write_all(b"\n").await
});
timeout(DEFAULT_TIMEOUT, writer).await???;
let notification = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_notification_message("externalAgentConfig/import/completed"),
)
.await??;
assert_eq!(notification.method, "externalAgentConfig/import/completed");
let notification = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_notification_message("externalAgentConfig/import/completed"),
)
.await??;
assert_eq!(notification.method, "externalAgentConfig/import/completed");
let request_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: None,
limit: None,
sort_key: None,
sort_direction: None,
model_providers: None,
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: None,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: ThreadListResponse = to_response(response)?;
assert_eq!(response.data.len(), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn external_agent_config_import_rejects_undetected_session_paths() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("unused").await;
@@ -635,6 +873,12 @@ async fn external_agent_config_import_compacts_huge_session_before_first_follow_
)
.await??;
let _: ExternalAgentConfigImportResponse = to_response(response)?;
let notification = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_notification_message("externalAgentConfig/import/completed"),
)
.await??;
assert_eq!(notification.method, "externalAgentConfig/import/completed");
let request_id = mcp
.send_thread_list_request(ThreadListParams {