mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
rollout
This commit is contained in:
@@ -130,7 +130,6 @@ async fn integration_creates_and_checks_session_file() {
|
||||
// 2. Unique marker we'll look for in the session log.
|
||||
let marker = format!("integration-test-{}", Uuid::new_v4());
|
||||
let prompt = format!("echo {marker}");
|
||||
eprintln!("prompt: {prompt}");
|
||||
|
||||
// 3. Use the same offline SSE fixture as responses_api_stream_cli so the test is hermetic.
|
||||
let fixture =
|
||||
@@ -163,7 +162,6 @@ async fn integration_creates_and_checks_session_file() {
|
||||
while !sessions_dir.exists() && Instant::now() < dir_deadline {
|
||||
std::thread::sleep(Duration::from_millis(50));
|
||||
}
|
||||
eprintln!("sessions_dir: {sessions_dir:?}");
|
||||
assert!(sessions_dir.exists(), "sessions directory never appeared");
|
||||
|
||||
// Find the session file that contains `marker`.
|
||||
@@ -174,39 +172,30 @@ async fn integration_creates_and_checks_session_file() {
|
||||
let entry = match entry {
|
||||
Ok(e) => e,
|
||||
Err(_) => {
|
||||
eprintln!("error walking dir: {entry:?}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if !entry.file_type().is_file() {
|
||||
eprintln!("not a file: {entry:?}");
|
||||
continue;
|
||||
}
|
||||
if !entry.file_name().to_string_lossy().ends_with(".jsonl") {
|
||||
eprintln!("not a jsonl file: {entry:?}");
|
||||
continue;
|
||||
}
|
||||
let path = entry.path();
|
||||
let Ok(content) = std::fs::read_to_string(path) else {
|
||||
eprintln!("error reading file: {path:?}");
|
||||
continue;
|
||||
};
|
||||
let mut lines = content.lines();
|
||||
if lines.next().is_none() {
|
||||
eprintln!("no lines in file: {path:?}");
|
||||
continue;
|
||||
}
|
||||
eprintln!("lines: {lines:?}");
|
||||
for line in lines {
|
||||
eprintln!("line: {line:?}");
|
||||
if line.trim().is_empty() {
|
||||
eprintln!("empty line in file: {path:?}");
|
||||
continue;
|
||||
}
|
||||
let item: serde_json::Value = match serde_json::from_str(line) {
|
||||
Ok(v) => v,
|
||||
Err(_) => {
|
||||
eprintln!("error parsing line as json: {line:?}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -214,7 +203,6 @@ async fn integration_creates_and_checks_session_file() {
|
||||
if let Some(c) = item.get("content") {
|
||||
if c.to_string().contains(&marker) {
|
||||
matching_path = Some(path.to_path_buf());
|
||||
eprintln!("found matching path: {path:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -225,7 +213,6 @@ async fn integration_creates_and_checks_session_file() {
|
||||
std::thread::sleep(Duration::from_millis(50));
|
||||
}
|
||||
}
|
||||
eprintln!("matching_path: {matching_path:?}");
|
||||
|
||||
let path = match matching_path {
|
||||
Some(p) => p,
|
||||
|
||||
@@ -83,6 +83,7 @@ use crate::protocol::SessionConfiguredEvent;
|
||||
use crate::protocol::Submission;
|
||||
use crate::protocol::TaskCompleteEvent;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::RolloutSetup;
|
||||
use crate::safety::SafetyCheck;
|
||||
use crate::safety::assess_command_safety;
|
||||
use crate::safety::assess_patch_safety;
|
||||
@@ -598,14 +599,18 @@ async fn submission_loop(
|
||||
return;
|
||||
}
|
||||
|
||||
let (rollout_recorder, restored_items, restored_prev_id, session_id) =
|
||||
crate::rollout::prepare_rollout_recorder(
|
||||
&config,
|
||||
session_id,
|
||||
instructions.clone(),
|
||||
resume_path.as_deref(),
|
||||
)
|
||||
.await;
|
||||
let RolloutSetup {
|
||||
recorder: rollout_recorder,
|
||||
restored_items,
|
||||
restored_prev_id,
|
||||
session_id,
|
||||
} = crate::rollout::prepare_rollout_recorder(
|
||||
&config,
|
||||
session_id,
|
||||
instructions.clone(),
|
||||
resume_path.as_deref(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let client = ModelClient::new(
|
||||
config.clone(),
|
||||
|
||||
@@ -45,6 +45,13 @@ pub struct SavedSession {
|
||||
pub session_id: Uuid,
|
||||
}
|
||||
|
||||
pub struct RolloutSetup {
|
||||
pub recorder: Option<RolloutRecorder>,
|
||||
pub restored_items: Option<Vec<ResponseItem>>,
|
||||
pub restored_prev_id: Option<String>,
|
||||
pub session_id: Uuid,
|
||||
}
|
||||
|
||||
/// Records all [`ResponseItem`]s for a session and flushes them to disk after
|
||||
/// every update.
|
||||
///
|
||||
@@ -258,7 +265,6 @@ async fn rollout_writer(
|
||||
let mut buf = serde_json::to_vec(value)?;
|
||||
buf.push(b'\n');
|
||||
file.write_all(&buf).await?;
|
||||
// TODO: decide if we want to flush here or TaskComplete is enough.
|
||||
file.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -315,12 +321,7 @@ pub async fn prepare_rollout_recorder(
|
||||
mut session_id: Uuid,
|
||||
instructions: Option<String>,
|
||||
resume_path: Option<&Path>,
|
||||
) -> (
|
||||
Option<RolloutRecorder>,
|
||||
Option<Vec<ResponseItem>>, // restored_items
|
||||
Option<String>, // restored_prev_id
|
||||
Uuid, // possibly updated session_id
|
||||
) {
|
||||
) -> RolloutSetup {
|
||||
// Try to resume
|
||||
let (mut restored_items, mut restored_prev_id, mut recorder_opt) = (None, None, None);
|
||||
|
||||
@@ -350,5 +351,10 @@ pub async fn prepare_rollout_recorder(
|
||||
}
|
||||
}
|
||||
|
||||
(recorder_opt, restored_items, restored_prev_id, session_id)
|
||||
RolloutSetup {
|
||||
recorder: recorder_opt,
|
||||
restored_items,
|
||||
restored_prev_id,
|
||||
session_id,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user