fix: thread/list returning fewer than the requested amount due to filtering CXA-293 (#7509)

This caused some conversations to not appear when they otherwise should.

Prior to this change, `thread/list`/`list_conversations_common` would:
- Fetch N conversations from `RolloutRecorder::list_conversations`
- Then it would filter those (like by the provided `model_providers`)
- This would make it potentially return less than N items.

With this change:
- `list_conversations_common` now continues fetching more conversations
from `RolloutRecorder::list_conversations` until it "fills up" the
`requested_page_size`.
- Ultimately this means that clients can rely on getting eg 20
conversations if they request 20 conversations.
This commit is contained in:
Javi
2025-12-10 15:06:32 -08:00
committed by GitHub
parent 90f262e9a4
commit e2559ab28d
3 changed files with 400 additions and 120 deletions

View File

@@ -185,6 +185,9 @@ pub(crate) struct TurnSummary {
pub(crate) type TurnSummaryStore = Arc<Mutex<HashMap<ConversationId, TurnSummary>>>;
const THREAD_LIST_DEFAULT_LIMIT: usize = 25;
const THREAD_LIST_MAX_LIMIT: usize = 100;
// Duration before a ChatGPT login attempt is abandoned.
const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60);
struct ActiveLogin {
@@ -1508,10 +1511,12 @@ impl CodexMessageProcessor {
model_providers,
} = params;
let page_size = limit.unwrap_or(25).max(1) as usize;
let requested_page_size = limit
.map(|value| value as usize)
.unwrap_or(THREAD_LIST_DEFAULT_LIMIT)
.clamp(1, THREAD_LIST_MAX_LIMIT);
let (summaries, next_cursor) = match self
.list_conversations_common(page_size, cursor, model_providers)
.list_conversations_common(requested_page_size, cursor, model_providers)
.await
{
Ok(r) => r,
@@ -1522,7 +1527,6 @@ impl CodexMessageProcessor {
};
let data = summaries.into_iter().map(summary_to_thread).collect();
let response = ThreadListResponse { data, next_cursor };
self.outgoing.send_response(request_id, response).await;
}
@@ -1800,10 +1804,12 @@ impl CodexMessageProcessor {
cursor,
model_providers,
} = params;
let page_size = page_size.unwrap_or(25).max(1);
let requested_page_size = page_size
.unwrap_or(THREAD_LIST_DEFAULT_LIMIT)
.clamp(1, THREAD_LIST_MAX_LIMIT);
match self
.list_conversations_common(page_size, cursor, model_providers)
.list_conversations_common(requested_page_size, cursor, model_providers)
.await
{
Ok((items, next_cursor)) => {
@@ -1818,12 +1824,15 @@ impl CodexMessageProcessor {
async fn list_conversations_common(
&self,
page_size: usize,
requested_page_size: usize,
cursor: Option<String>,
model_providers: Option<Vec<String>>,
) -> Result<(Vec<ConversationSummary>, Option<String>), JSONRPCErrorError> {
let cursor_obj: Option<RolloutCursor> = cursor.as_ref().and_then(|s| parse_cursor(s));
let cursor_ref = cursor_obj.as_ref();
let mut cursor_obj: Option<RolloutCursor> = cursor.as_ref().and_then(|s| parse_cursor(s));
let mut last_cursor = cursor_obj.clone();
let mut remaining = requested_page_size;
let mut items = Vec::with_capacity(requested_page_size);
let mut next_cursor: Option<String> = None;
let model_provider_filter = match model_providers {
Some(providers) => {
@@ -1837,48 +1846,69 @@ impl CodexMessageProcessor {
};
let fallback_provider = self.config.model_provider_id.clone();
let page = match RolloutRecorder::list_conversations(
&self.config.codex_home,
page_size,
cursor_ref,
INTERACTIVE_SESSION_SOURCES,
model_provider_filter.as_deref(),
fallback_provider.as_str(),
)
.await
{
Ok(p) => p,
Err(err) => {
return Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list conversations: {err}"),
data: None,
});
while remaining > 0 {
let page_size = remaining.min(THREAD_LIST_MAX_LIMIT);
let page = RolloutRecorder::list_conversations(
&self.config.codex_home,
page_size,
cursor_obj.as_ref(),
INTERACTIVE_SESSION_SOURCES,
model_provider_filter.as_deref(),
fallback_provider.as_str(),
)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list conversations: {err}"),
data: None,
})?;
let mut filtered = page
.items
.into_iter()
.filter_map(|it| {
let session_meta_line = it.head.first().and_then(|first| {
serde_json::from_value::<SessionMetaLine>(first.clone()).ok()
})?;
extract_conversation_summary(
it.path,
&it.head,
&session_meta_line.meta,
session_meta_line.git.as_ref(),
fallback_provider.as_str(),
)
})
.collect::<Vec<_>>();
if filtered.len() > remaining {
filtered.truncate(remaining);
}
};
items.extend(filtered);
remaining = requested_page_size.saturating_sub(items.len());
let items = page
.items
.into_iter()
.filter_map(|it| {
let session_meta_line = it.head.first().and_then(|first| {
serde_json::from_value::<SessionMetaLine>(first.clone()).ok()
})?;
extract_conversation_summary(
it.path,
&it.head,
&session_meta_line.meta,
session_meta_line.git.as_ref(),
fallback_provider.as_str(),
)
})
.collect::<Vec<_>>();
// Encode RolloutCursor into the JSON-RPC string form returned to clients.
let next_cursor_value = page.next_cursor.clone();
next_cursor = next_cursor_value
.as_ref()
.and_then(|cursor| serde_json::to_value(cursor).ok())
.and_then(|value| value.as_str().map(str::to_owned));
if remaining == 0 {
break;
}
// Encode next_cursor as a plain string
let next_cursor = page
.next_cursor
.and_then(|cursor| serde_json::to_value(&cursor).ok())
.and_then(|value| value.as_str().map(str::to_owned));
match next_cursor_value {
Some(cursor_val) if remaining > 0 => {
// Break if our pagination would reuse the same cursor again; this avoids
// an infinite loop when filtering drops everything on the page.
if last_cursor.as_ref() == Some(&cursor_val) {
next_cursor = None;
break;
}
last_cursor = Some(cursor_val.clone());
cursor_obj = Some(cursor_val);
}
_ => break,
}
}
Ok((items, next_cursor))
}

View File

@@ -358,3 +358,81 @@ async fn test_list_and_resume_conversations() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn list_conversations_fetches_through_filtered_pages() -> Result<()> {
let codex_home = TempDir::new()?;
// Only the last 3 conversations match the provider filter; request 3 and
// ensure pagination keeps fetching past non-matching pages.
let cases = [
(
"2025-03-04T12-00-00",
"2025-03-04T12:00:00Z",
"skip_provider",
),
(
"2025-03-03T12-00-00",
"2025-03-03T12:00:00Z",
"skip_provider",
),
(
"2025-03-02T12-00-00",
"2025-03-02T12:00:00Z",
"target_provider",
),
(
"2025-03-01T12-00-00",
"2025-03-01T12:00:00Z",
"target_provider",
),
(
"2025-02-28T12-00-00",
"2025-02-28T12:00:00Z",
"target_provider",
),
];
for (ts_file, ts_rfc, provider) in cases {
create_fake_rollout(
codex_home.path(),
ts_file,
ts_rfc,
"Hello",
Some(provider),
None,
)?;
}
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let req_id = mcp
.send_list_conversations_request(ListConversationsParams {
page_size: Some(3),
cursor: None,
model_providers: Some(vec!["target_provider".to_string()]),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;
let ListConversationsResponse { items, next_cursor } =
to_response::<ListConversationsResponse>(resp)?;
assert_eq!(
items.len(),
3,
"should fetch across pages to satisfy the limit"
);
assert!(
items
.iter()
.all(|item| item.model_provider == "target_provider")
);
assert_eq!(next_cursor, None);
Ok(())
}

View File

@@ -6,37 +6,96 @@ use codex_app_server_protocol::GitInfo as ApiGitInfo;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_protocol::protocol::GitInfo as CoreGitInfo;
use std::path::Path;
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
async fn init_mcp(codex_home: &Path) -> Result<McpProcess> {
let mut mcp = McpProcess::new(codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
Ok(mcp)
}
async fn list_threads(
mcp: &mut McpProcess,
cursor: Option<String>,
limit: Option<u32>,
providers: Option<Vec<String>>,
) -> Result<ThreadListResponse> {
let request_id = mcp
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
cursor,
limit,
model_providers: providers,
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
to_response::<ThreadListResponse>(resp)
}
fn create_fake_rollouts<F, G>(
codex_home: &Path,
count: usize,
provider_for_index: F,
timestamp_for_index: G,
preview: &str,
) -> Result<Vec<String>>
where
F: Fn(usize) -> &'static str,
G: Fn(usize) -> (String, String),
{
let mut ids = Vec::with_capacity(count);
for i in 0..count {
let (ts_file, ts_rfc) = timestamp_for_index(i);
ids.push(create_fake_rollout(
codex_home,
&ts_file,
&ts_rfc,
preview,
Some(provider_for_index(i)),
None,
)?);
}
Ok(ids)
}
fn timestamp_at(
year: i32,
month: u32,
day: u32,
hour: u32,
minute: u32,
second: u32,
) -> (String, String) {
(
format!("{year:04}-{month:02}-{day:02}T{hour:02}-{minute:02}-{second:02}"),
format!("{year:04}-{month:02}-{day:02}T{hour:02}:{minute:02}:{second:02}Z"),
)
}
#[tokio::test]
async fn thread_list_basic_empty() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let mut mcp = init_mcp(codex_home.path()).await?;
// List threads in an empty CODEX_HOME; should return an empty page with nextCursor: null.
let list_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: None,
limit: Some(10),
model_providers: Some(vec!["mock_provider".to_string()]),
})
.await?;
let list_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
let ThreadListResponse { data, next_cursor } = list_threads(
&mut mcp,
None,
Some(10),
Some(vec!["mock_provider".to_string()]),
)
.await??;
let ThreadListResponse { data, next_cursor } = to_response::<ThreadListResponse>(list_resp)?;
.await?;
assert!(data.is_empty());
assert_eq!(next_cursor, None);
@@ -86,26 +145,19 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
None,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let mut mcp = init_mcp(codex_home.path()).await?;
// Page 1: limit 2 → expect next_cursor Some.
let page1_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: None,
limit: Some(2),
model_providers: Some(vec!["mock_provider".to_string()]),
})
.await?;
let page1_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(page1_id)),
)
.await??;
let ThreadListResponse {
data: data1,
next_cursor: cursor1,
} = to_response::<ThreadListResponse>(page1_resp)?;
} = list_threads(
&mut mcp,
None,
Some(2),
Some(vec!["mock_provider".to_string()]),
)
.await?;
assert_eq!(data1.len(), 2);
for thread in &data1 {
assert_eq!(thread.preview, "Hello");
@@ -119,22 +171,16 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
let cursor1 = cursor1.expect("expected nextCursor on first page");
// Page 2: with cursor → expect next_cursor None when no more results.
let page2_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: Some(cursor1),
limit: Some(2),
model_providers: Some(vec!["mock_provider".to_string()]),
})
.await?;
let page2_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(page2_id)),
)
.await??;
let ThreadListResponse {
data: data2,
next_cursor: cursor2,
} = to_response::<ThreadListResponse>(page2_resp)?;
} = list_threads(
&mut mcp,
Some(cursor1),
Some(2),
Some(vec!["mock_provider".to_string()]),
)
.await?;
assert!(data2.len() <= 2);
for thread in &data2 {
assert_eq!(thread.preview, "Hello");
@@ -173,23 +219,16 @@ async fn thread_list_respects_provider_filter() -> Result<()> {
None,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let mut mcp = init_mcp(codex_home.path()).await?;
// Filter to only other_provider; expect 1 item, nextCursor None.
let list_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: None,
limit: Some(10),
model_providers: Some(vec!["other_provider".to_string()]),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
let ThreadListResponse { data, next_cursor } = list_threads(
&mut mcp,
None,
Some(10),
Some(vec!["other_provider".to_string()]),
)
.await??;
let ThreadListResponse { data, next_cursor } = to_response::<ThreadListResponse>(resp)?;
.await?;
assert_eq!(data.len(), 1);
assert_eq!(next_cursor, None);
let thread = &data[0];
@@ -205,6 +244,146 @@ async fn thread_list_respects_provider_filter() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_list_fetches_until_limit_or_exhausted() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
// Newest 16 conversations belong to a different provider; the older 8 are the
// only ones that match the filter. We request 8 so the server must keep
// paging past the first two pages to reach the desired count.
create_fake_rollouts(
codex_home.path(),
24,
|i| {
if i < 16 {
"skip_provider"
} else {
"target_provider"
}
},
|i| timestamp_at(2025, 3, 30 - i as u32, 12, 0, 0),
"Hello",
)?;
let mut mcp = init_mcp(codex_home.path()).await?;
// Request 8 threads for the target provider; the matches only start on the
// third page so we rely on pagination to reach the limit.
let ThreadListResponse { data, next_cursor } = list_threads(
&mut mcp,
None,
Some(8),
Some(vec!["target_provider".to_string()]),
)
.await?;
assert_eq!(
data.len(),
8,
"should keep paging until the requested count is filled"
);
assert!(
data.iter()
.all(|thread| thread.model_provider == "target_provider"),
"all returned threads must match the requested provider"
);
assert_eq!(
next_cursor, None,
"once the requested count is satisfied on the final page, nextCursor should be None"
);
Ok(())
}
#[tokio::test]
async fn thread_list_enforces_max_limit() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
create_fake_rollouts(
codex_home.path(),
105,
|_| "mock_provider",
|i| {
let month = 5 + (i / 28);
let day = (i % 28) + 1;
timestamp_at(2025, month as u32, day as u32, 0, 0, 0)
},
"Hello",
)?;
let mut mcp = init_mcp(codex_home.path()).await?;
let ThreadListResponse { data, next_cursor } = list_threads(
&mut mcp,
None,
Some(200),
Some(vec!["mock_provider".to_string()]),
)
.await?;
assert_eq!(
data.len(),
100,
"limit should be clamped to the maximum page size"
);
assert!(
next_cursor.is_some(),
"when more than the maximum exist, nextCursor should continue pagination"
);
Ok(())
}
#[tokio::test]
async fn thread_list_stops_when_not_enough_filtered_results_exist() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
// Only the last 7 conversations match the provider filter; we ask for 10 to
// ensure the server exhausts pagination without looping forever.
create_fake_rollouts(
codex_home.path(),
22,
|i| {
if i < 15 {
"skip_provider"
} else {
"target_provider"
}
},
|i| timestamp_at(2025, 4, 28 - i as u32, 8, 0, 0),
"Hello",
)?;
let mut mcp = init_mcp(codex_home.path()).await?;
// Request more threads than exist after filtering; expect all matches to be
// returned with nextCursor None.
let ThreadListResponse { data, next_cursor } = list_threads(
&mut mcp,
None,
Some(10),
Some(vec!["target_provider".to_string()]),
)
.await?;
assert_eq!(
data.len(),
7,
"all available filtered threads should be returned"
);
assert!(
data.iter()
.all(|thread| thread.model_provider == "target_provider"),
"results should still respect the provider filter"
);
assert_eq!(
next_cursor, None,
"when results are exhausted before reaching the limit, nextCursor should be None"
);
Ok(())
}
#[tokio::test]
async fn thread_list_includes_git_info() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -224,22 +403,15 @@ async fn thread_list_includes_git_info() -> Result<()> {
Some(git_info),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let mut mcp = init_mcp(codex_home.path()).await?;
let list_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: None,
limit: Some(10),
model_providers: Some(vec!["mock_provider".to_string()]),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
let ThreadListResponse { data, .. } = list_threads(
&mut mcp,
None,
Some(10),
Some(vec!["mock_provider".to_string()]),
)
.await??;
let ThreadListResponse { data, .. } = to_response::<ThreadListResponse>(resp)?;
.await?;
let thread = data
.iter()
.find(|t| t.id == conversation_id)