add tests

This commit is contained in:
jif-oai
2026-02-25 16:36:55 +00:00
parent fa7c298f68
commit de11fbb068
2 changed files with 196 additions and 7 deletions

View File

@@ -13,7 +13,6 @@ use crate::error::Result;
use crate::function_tool::FunctionCallError;
use crate::memories::citations::get_thread_id_from_citations;
use crate::parse_turn_item;
use crate::state_db;
use crate::tools::parallel::ToolCallRuntime;
use crate::tools::router::ToolRouter;
use codex_protocol::models::FunctionCallOutputBody;
@@ -58,13 +57,10 @@ pub(crate) async fn record_completed_response_item(
) {
sess.record_conversation_items(turn_context, std::slice::from_ref(item))
.await;
record_stage1_output_usage_for_completed_item(turn_context, item).await;
record_stage1_output_usage_for_completed_item(sess, item).await;
}
async fn record_stage1_output_usage_for_completed_item(
turn_context: &TurnContext,
item: &ResponseItem,
) {
async fn record_stage1_output_usage_for_completed_item(sess: &Session, item: &ResponseItem) {
let Some(raw_text) = raw_assistant_output_text_from_item(item) else {
return;
};
@@ -75,7 +71,7 @@ async fn record_stage1_output_usage_for_completed_item(
return;
}
if let Some(db) = state_db::get_state_db(turn_context.config.as_ref(), None).await {
if let Some(db) = sess.services.state_db.as_deref() {
let _ = db.record_stage1_output_usage(&thread_ids).await;
}
}

View File

@@ -1,4 +1,6 @@
use anyhow::Result;
use chrono::DateTime;
use chrono::Utc;
use codex_core::features::Feature;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
@@ -10,11 +12,14 @@ use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::UserMessageEvent;
use core_test_support::responses;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use pretty_assertions::assert_eq;
use serde_json::json;
@@ -253,6 +258,117 @@ async fn user_messages_persist_in_state_db() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn assistant_memory_citations_update_usage_and_reorder_phase2_selection() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::Sqlite);
});
let test = builder.build(&server).await?;
let db = test.codex.state_db().expect("state db enabled");
let owner = test.session_configured.session_id;
let cited_thread = ThreadId::new();
let uncited_thread = ThreadId::new();
seed_stage1_output(&test, cited_thread, owner, "workspace-cited", 100).await?;
seed_stage1_output(&test, uncited_thread, owner, "workspace-uncited", 200).await?;
let initial_selection = db
.select_stage1_outputs_for_phase2(1, test.config.memories.max_unused_days)
.await?;
assert_eq!(
initial_selection
.selected
.iter()
.map(|memory| memory.thread_id)
.collect::<Vec<_>>(),
vec![uncited_thread]
);
mark_phase2_clean(db.as_ref(), owner).await?;
let assistant_text = format!(
"Using a prior memory.\n<oai-mem-citation>\n<citation_entries>\nrollout_summaries/test.md:1-2|note=[integration]\n</citation_entries>\n<rollout_ids>\n{cited_thread}\n</rollout_ids>\n</oai-mem-citation>"
);
mount_sse_once(
&server,
responses::sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", &assistant_text),
ev_completed("resp-1"),
]),
)
.await;
test.submit_turn("answer using memory").await?;
let usage_only_claim = db.try_claim_global_phase2_job(owner, 3600).await?;
let (ownership_token, input_watermark) = match usage_only_claim {
codex_state::Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => (ownership_token, input_watermark),
other => panic!("expected usage-only phase2 claim, got {other:?}"),
};
assert!(
db.mark_global_phase2_job_succeeded(ownership_token.as_str(), input_watermark)
.await?,
"usage-only citation should re-dirty phase 2"
);
let updated_selection = db
.select_stage1_outputs_for_phase2(1, test.config.memories.max_unused_days)
.await?;
assert_eq!(
updated_selection
.selected
.iter()
.map(|memory| memory.thread_id)
.collect::<Vec<_>>(),
vec![cited_thread]
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn assistant_messages_without_memory_citations_do_not_redirty_phase2() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::Sqlite);
});
let test = builder.build(&server).await?;
let db = test.codex.state_db().expect("state db enabled");
let owner = test.session_configured.session_id;
let thread_id = ThreadId::new();
seed_stage1_output(&test, thread_id, owner, "workspace-plain", 100).await?;
mark_phase2_clean(db.as_ref(), owner).await?;
mount_sse_once(
&server,
responses::sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "No memory references here."),
ev_completed("resp-1"),
]),
)
.await;
test.submit_turn("answer without memory").await?;
let phase2_claim = db.try_claim_global_phase2_job(owner, 3600).await?;
assert!(
matches!(
phase2_claim,
codex_state::Phase2JobClaimOutcome::SkippedNotDirty
),
"non-citation assistant output should not enqueue phase 2"
);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn tool_call_logs_include_thread_id() -> Result<()> {
let server = start_mock_server().await;
@@ -326,3 +442,80 @@ async fn tool_call_logs_include_thread_id() -> Result<()> {
Ok(())
}
async fn seed_stage1_output(
test: &TestCodex,
thread_id: ThreadId,
owner: ThreadId,
workspace_name: &str,
source_updated_at: i64,
) -> Result<()> {
let db = test.codex.state_db().expect("state db enabled");
let metadata = seeded_thread_metadata(
test,
thread_id,
workspace_name,
DateTime::<Utc>::from_timestamp(source_updated_at, 0).expect("timestamp"),
);
db.upsert_thread(&metadata).await?;
let claim = db
.try_claim_stage1_job(thread_id, owner, source_updated_at, 3600, 64)
.await?;
let ownership_token = match claim {
codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("expected stage1 claim, got {other:?}"),
};
assert!(
db.mark_stage1_job_succeeded(
thread_id,
ownership_token.as_str(),
source_updated_at,
"raw memory",
"rollout summary",
None,
)
.await?,
"stage1 success should persist output"
);
Ok(())
}
async fn mark_phase2_clean(db: &codex_state::StateRuntime, owner: ThreadId) -> Result<()> {
let claim = db.try_claim_global_phase2_job(owner, 3600).await?;
let (ownership_token, input_watermark) = match claim {
codex_state::Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => (ownership_token, input_watermark),
other => panic!("expected phase2 claim, got {other:?}"),
};
assert!(
db.mark_global_phase2_job_succeeded(ownership_token.as_str(), input_watermark)
.await?,
"phase2 success should clear dirty state"
);
Ok(())
}
fn seeded_thread_metadata(
test: &TestCodex,
thread_id: ThreadId,
workspace_name: &str,
timestamp: DateTime<Utc>,
) -> codex_state::ThreadMetadata {
let mut builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
test.config
.codex_home
.join(format!("sessions/seed-{thread_id}.jsonl")),
timestamp,
SessionSource::default(),
);
builder.updated_at = Some(timestamp);
builder.cwd = test.config.codex_home.join(workspace_name);
builder.cli_version = Some("test".to_string());
builder.build(&test.config.model_provider_id)
}