feat: add slug in name (#11739)

This commit is contained in:
jif-oai
2026-02-13 15:24:03 +00:00
committed by GitHub
parent bc80a4a8ed
commit db66d827be
7 changed files with 237 additions and 22 deletions

View File

@@ -69,10 +69,9 @@ struct StageOneOutput {
/// Compact summary line used for routing and indexing.
#[serde(rename = "rollout_summary")]
pub(crate) rollout_summary: String,
/// Optional slug accepted from stage-1 output for forward compatibility.
/// This is currently ignored by downstream storage and naming, which remain thread-id based.
/// Optional slug used to derive rollout summary artifact filenames.
#[serde(default, rename = "rollout_slug")]
pub(crate) _rollout_slug: Option<String>,
pub(crate) rollout_slug: Option<String>,
}
/// Runs memory phase 1 in strict step order:
@@ -122,7 +121,7 @@ pub fn output_schema() -> Value {
"rollout_slug": { "type": "string" },
"raw_memory": { "type": "string" }
},
"required": ["rollout_summary", "rollout_slug", "raw_memory"],
"required": ["rollout_summary", "raw_memory"],
"additionalProperties": false
})
}
@@ -268,6 +267,7 @@ mod job {
thread.updated_at.timestamp(),
&stage_one_output.raw_memory,
&stage_one_output.rollout_summary,
stage_one_output.rollout_slug.as_deref(),
)
.await,
token_usage,
@@ -348,6 +348,7 @@ mod job {
let mut output: StageOneOutput = serde_json::from_str(&result)?;
output.raw_memory = redact_secrets(output.raw_memory);
output.rollout_summary = redact_secrets(output.rollout_summary);
output.rollout_slug = output.rollout_slug.map(redact_secrets);
Ok((output, token_usage))
}
@@ -401,6 +402,7 @@ mod job {
source_updated_at: i64,
raw_memory: &str,
rollout_summary: &str,
rollout_slug: Option<&str>,
) -> JobOutcome {
let Some(state_db) = session.services.state_db.as_deref() else {
return JobOutcome::Failed;
@@ -413,6 +415,7 @@ mod job {
source_updated_at,
raw_memory,
rollout_summary,
rollout_slug,
)
.await
.unwrap_or(false)

View File

@@ -34,7 +34,7 @@ pub(super) async fn sync_rollout_summaries_from_memories(
.collect::<Vec<_>>();
let keep = retained
.iter()
.map(|memory| memory.thread_id.to_string())
.map(|memory| rollout_summary_file_stem(memory))
.collect::<BTreeSet<_>>();
prune_rollout_summaries(root, &keep).await?;
@@ -113,10 +113,10 @@ async fn prune_rollout_summaries(root: &Path, keep: &BTreeSet<String>) -> std::i
let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
continue;
};
let Some(thread_id) = extract_thread_id_from_rollout_summary_filename(file_name) else {
let Some(stem) = file_name.strip_suffix(".md") else {
continue;
};
if !keep.contains(thread_id)
if !keep.contains(stem)
&& let Err(err) = tokio::fs::remove_file(&path).await
&& err.kind() != std::io::ErrorKind::NotFound
{
@@ -134,7 +134,8 @@ async fn write_rollout_summary_for_thread(
root: &Path,
memory: &Stage1Output,
) -> std::io::Result<()> {
let path = rollout_summaries_dir(root).join(format!("{}.md", memory.thread_id));
let file_stem = rollout_summary_file_stem(memory);
let path = rollout_summaries_dir(root).join(format!("{file_stem}.md"));
let mut body = String::new();
writeln!(body, "thread_id: {}", memory.thread_id)
@@ -155,7 +156,85 @@ async fn write_rollout_summary_for_thread(
tokio::fs::write(path, body).await
}
fn extract_thread_id_from_rollout_summary_filename(file_name: &str) -> Option<&str> {
let stem = file_name.strip_suffix(".md")?;
if stem.is_empty() { None } else { Some(stem) }
fn rollout_summary_file_stem(memory: &Stage1Output) -> String {
const ROLLOUT_SLUG_MAX_LEN: usize = 20;
let thread_id = memory.thread_id.to_string();
let Some(raw_slug) = memory.rollout_slug.as_deref() else {
return thread_id;
};
let mut slug = String::with_capacity(ROLLOUT_SLUG_MAX_LEN);
for ch in raw_slug.chars() {
if slug.len() >= ROLLOUT_SLUG_MAX_LEN {
break;
}
if ch.is_ascii_alphanumeric() {
slug.push(ch.to_ascii_lowercase());
} else {
slug.push('_');
}
}
while slug.ends_with('_') {
slug.pop();
}
if slug.is_empty() {
thread_id
} else {
format!("{thread_id}-{slug}")
}
}
#[cfg(test)]
mod tests {
use super::rollout_summary_file_stem;
use chrono::TimeZone;
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_state::Stage1Output;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
fn stage1_output_with_slug(rollout_slug: Option<&str>) -> Stage1Output {
Stage1Output {
thread_id: ThreadId::new(),
source_updated_at: Utc.timestamp_opt(123, 0).single().expect("timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "summary".to_string(),
rollout_slug: rollout_slug.map(ToString::to_string),
cwd: PathBuf::from("/tmp/workspace"),
generated_at: Utc.timestamp_opt(124, 0).single().expect("timestamp"),
}
}
#[test]
fn rollout_summary_file_stem_uses_thread_id_when_slug_missing() {
let memory = stage1_output_with_slug(None);
let thread_id = memory.thread_id.to_string();
assert_eq!(rollout_summary_file_stem(&memory), thread_id);
}
#[test]
fn rollout_summary_file_stem_sanitizes_and_truncates_slug() {
let memory =
stage1_output_with_slug(Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345"));
let thread_id = memory.thread_id.to_string();
assert_eq!(
rollout_summary_file_stem(&memory),
format!("{thread_id}-unsafe_slug_with_spa")
);
}
#[test]
fn rollout_summary_file_stem_uses_thread_id_when_slug_is_empty() {
let memory = stage1_output_with_slug(Some(""));
let thread_id = memory.thread_id.to_string();
assert_eq!(rollout_summary_file_stem(&memory), thread_id);
}
}

View File

@@ -22,7 +22,7 @@ fn memory_root_uses_shared_global_path() {
}
#[test]
fn stage_one_output_schema_requires_all_declared_properties() {
fn stage_one_output_schema_keeps_rollout_slug_optional() {
let schema = crate::memories::phase1::output_schema();
let properties = schema
.get("properties")
@@ -33,16 +33,17 @@ fn stage_one_output_schema_requires_all_declared_properties() {
.and_then(Value::as_array)
.expect("required array");
let mut property_keys = properties.keys().map(String::as_str).collect::<Vec<_>>();
property_keys.sort_unstable();
let mut required_keys = required
.iter()
.map(|key| key.as_str().expect("required key string"))
.collect::<Vec<_>>();
required_keys.sort_unstable();
assert_eq!(required_keys, property_keys);
assert!(
properties.contains_key("rollout_slug"),
"schema should declare rollout_slug"
);
assert_eq!(required_keys, vec!["raw_memory", "rollout_summary"]);
}
#[tokio::test]
@@ -67,6 +68,7 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
source_updated_at: Utc.timestamp_opt(100, 0).single().expect("timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "short summary".to_string(),
rollout_slug: None,
cwd: PathBuf::from("/tmp/workspace"),
generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"),
}];
@@ -97,6 +99,83 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
assert!(raw_memories.contains("cwd: /tmp/workspace"));
}
#[tokio::test]
async fn sync_rollout_summaries_uses_thread_id_and_sanitized_slug_filename() {
let dir = tempdir().expect("tempdir");
let root = dir.path().join("memory");
ensure_layout(&root).await.expect("ensure layout");
let thread_id = ThreadId::new();
let stale_unslugged_path = rollout_summaries_dir(&root).join(format!("{thread_id}.md"));
let stale_old_slug_path =
rollout_summaries_dir(&root).join(format!("{thread_id}--old-slug.md"));
tokio::fs::write(&stale_unslugged_path, "stale")
.await
.expect("write stale unslugged file");
tokio::fs::write(&stale_old_slug_path, "stale")
.await
.expect("write stale old-slug file");
let memories = vec![Stage1Output {
thread_id,
source_updated_at: Utc.timestamp_opt(200, 0).single().expect("timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "short summary".to_string(),
rollout_slug: Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345".to_string()),
cwd: PathBuf::from("/tmp/workspace"),
generated_at: Utc.timestamp_opt(201, 0).single().expect("timestamp"),
}];
sync_rollout_summaries_from_memories(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
)
.await
.expect("sync rollout summaries");
let mut dir = tokio::fs::read_dir(rollout_summaries_dir(&root))
.await
.expect("open rollout summaries dir");
let mut files = Vec::new();
while let Some(entry) = dir.next_entry().await.expect("read dir entry") {
files.push(entry.file_name().to_string_lossy().to_string());
}
files.sort_unstable();
assert_eq!(files.len(), 1);
let file_name = &files[0];
let stem = file_name
.strip_suffix(".md")
.expect("rollout summary file should end with .md");
let slug = stem
.strip_prefix(&format!("{thread_id}-"))
.expect("rollout summary filename should include thread id and slug");
assert!(slug.len() <= 20, "slug should be capped at 20 chars");
assert!(
slug.chars()
.all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_'),
"slug should be file-safe lowercase ascii with underscores"
);
let summary = tokio::fs::read_to_string(rollout_summaries_dir(&root).join(file_name))
.await
.expect("read rollout summary");
assert!(summary.contains(&format!("thread_id: {thread_id}")));
assert!(
!tokio::fs::try_exists(&stale_unslugged_path)
.await
.expect("check stale unslugged path"),
"slugged sync should prune stale unslugged filename for same thread"
);
assert!(
!tokio::fs::try_exists(&stale_old_slug_path)
.await
.expect("check stale old slug path"),
"slugged sync should prune stale slugged filename for same thread"
);
}
mod phase2 {
use crate::CodexAuth;
use crate::ThreadManager;
@@ -130,6 +209,7 @@ mod phase2 {
.expect("valid source_updated_at timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "rollout summary".to_string(),
rollout_slug: None,
cwd: PathBuf::from("/tmp/workspace"),
generated_at: chrono::DateTime::<Utc>::from_timestamp(source_updated_at + 1, 0)
.expect("valid generated_at timestamp"),
@@ -220,6 +300,7 @@ mod phase2 {
source_updated_at,
"raw memory",
"rollout summary",
None,
)
.await
.expect("mark stage-1 success"),
@@ -572,6 +653,7 @@ mod phase2 {
100,
"raw memory",
"rollout summary",
None,
)
.await
.expect("mark stage-1 success"),

View File

@@ -0,0 +1,2 @@
ALTER TABLE stage1_outputs
ADD COLUMN rollout_slug TEXT;

View File

@@ -15,6 +15,7 @@ pub struct Stage1Output {
pub source_updated_at: DateTime<Utc>,
pub raw_memory: String,
pub rollout_summary: String,
pub rollout_slug: Option<String>,
pub cwd: PathBuf,
pub generated_at: DateTime<Utc>,
}
@@ -25,6 +26,7 @@ pub(crate) struct Stage1OutputRow {
source_updated_at: i64,
raw_memory: String,
rollout_summary: String,
rollout_slug: Option<String>,
cwd: String,
generated_at: i64,
}
@@ -36,6 +38,7 @@ impl Stage1OutputRow {
source_updated_at: row.try_get("source_updated_at")?,
raw_memory: row.try_get("raw_memory")?,
rollout_summary: row.try_get("rollout_summary")?,
rollout_slug: row.try_get("rollout_slug")?,
cwd: row.try_get("cwd")?,
generated_at: row.try_get("generated_at")?,
})
@@ -51,6 +54,7 @@ impl TryFrom<Stage1OutputRow> for Stage1Output {
source_updated_at: epoch_seconds_to_datetime(row.source_updated_at)?,
raw_memory: row.raw_memory,
rollout_summary: row.rollout_summary,
rollout_slug: row.rollout_slug,
cwd: PathBuf::from(row.cwd),
generated_at: epoch_seconds_to_datetime(row.generated_at)?,
})

View File

@@ -1143,7 +1143,14 @@ WHERE id = 1
assert!(
runtime
.mark_stage1_job_succeeded(thread_id, ownership_token.as_str(), 100, "raw", "sum")
.mark_stage1_job_succeeded(
thread_id,
ownership_token.as_str(),
100,
"raw",
"sum",
None,
)
.await
.expect("mark stage1 succeeded"),
"stage1 success should finalize for current token"
@@ -1492,6 +1499,7 @@ WHERE id = 1
up_to_date.updated_at.timestamp(),
"raw",
"summary",
None,
)
.await
.expect("mark up-to-date thread succeeded"),
@@ -1715,6 +1723,7 @@ WHERE kind = 'memory_stage1'
claim.thread.updated_at.timestamp(),
"raw",
"summary",
None,
)
.await
.expect("mark first-batch stage1 success"),
@@ -1766,7 +1775,14 @@ WHERE kind = 'memory_stage1'
};
assert!(
runtime
.mark_stage1_job_succeeded(thread_id, ownership_token.as_str(), 100, "raw", "sum")
.mark_stage1_job_succeeded(
thread_id,
ownership_token.as_str(),
100,
"raw",
"sum",
None,
)
.await
.expect("mark stage1 succeeded"),
"mark stage1 succeeded should write stage1_outputs"
@@ -2058,6 +2074,7 @@ WHERE kind = 'memory_stage1'
100,
"raw memory a",
"summary a",
None,
)
.await
.expect("mark stage1 succeeded a"),
@@ -2080,6 +2097,7 @@ WHERE kind = 'memory_stage1'
101,
"raw memory b",
"summary b",
Some("rollout-b"),
)
.await
.expect("mark stage1 succeeded b"),
@@ -2093,9 +2111,11 @@ WHERE kind = 'memory_stage1'
assert_eq!(outputs.len(), 2);
assert_eq!(outputs[0].thread_id, thread_id_b);
assert_eq!(outputs[0].rollout_summary, "summary b");
assert_eq!(outputs[0].rollout_slug.as_deref(), Some("rollout-b"));
assert_eq!(outputs[0].cwd, codex_home.join("workspace-b"));
assert_eq!(outputs[1].thread_id, thread_id_a);
assert_eq!(outputs[1].rollout_summary, "summary a");
assert_eq!(outputs[1].rollout_slug, None);
assert_eq!(outputs[1].cwd, codex_home.join("workspace-a"));
let _ = tokio::fs::remove_dir_all(codex_home).await;
@@ -2208,7 +2228,14 @@ VALUES (?, ?, ?, ?, ?)
};
assert!(
runtime
.mark_stage1_job_succeeded(thread_a, token_a.as_str(), 100, "raw-a", "summary-a")
.mark_stage1_job_succeeded(
thread_a,
token_a.as_str(),
100,
"raw-a",
"summary-a",
None,
)
.await
.expect("mark stage1 succeeded a"),
"stage1 success should persist output for thread a"
@@ -2224,7 +2251,14 @@ VALUES (?, ?, ?, ?, ?)
};
assert!(
runtime
.mark_stage1_job_succeeded(thread_b, token_b.as_str(), 101, "raw-b", "summary-b")
.mark_stage1_job_succeeded(
thread_b,
token_b.as_str(),
101,
"raw-b",
"summary-b",
None,
)
.await
.expect("mark stage1 succeeded b"),
"stage1 success should persist output for thread b"

View File

@@ -191,7 +191,13 @@ LEFT JOIN jobs
let rows = sqlx::query(
r#"
SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.rollout_summary, so.generated_at
SELECT
so.thread_id,
so.source_updated_at,
so.raw_memory,
so.rollout_summary,
so.rollout_slug,
so.generated_at
, COALESCE(t.cwd, '') AS cwd
FROM stage1_outputs AS so
LEFT JOIN threads AS t
@@ -407,6 +413,7 @@ WHERE kind = ? AND job_key = ?
/// - sets `status='done'` and `last_success_watermark = input_watermark`
/// - upserts `stage1_outputs` for the thread, replacing existing output only
/// when `source_updated_at` is newer or equal
/// - persists optional `rollout_slug` for rollout summary artifact naming
/// - enqueues/advances the global phase-2 job watermark using
/// `source_updated_at`
pub async fn mark_stage1_job_succeeded(
@@ -416,6 +423,7 @@ WHERE kind = ? AND job_key = ?
source_updated_at: i64,
raw_memory: &str,
rollout_summary: &str,
rollout_slug: Option<&str>,
) -> anyhow::Result<bool> {
let now = Utc::now().timestamp();
let thread_id = thread_id.to_string();
@@ -454,12 +462,14 @@ INSERT INTO stage1_outputs (
source_updated_at,
raw_memory,
rollout_summary,
rollout_slug,
generated_at
) VALUES (?, ?, ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(thread_id) DO UPDATE SET
source_updated_at = excluded.source_updated_at,
raw_memory = excluded.raw_memory,
rollout_summary = excluded.rollout_summary,
rollout_slug = excluded.rollout_slug,
generated_at = excluded.generated_at
WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
"#,
@@ -468,6 +478,7 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
.bind(source_updated_at)
.bind(raw_memory)
.bind(rollout_summary)
.bind(rollout_slug)
.bind(now)
.execute(&mut *tx)
.await?;