Compare commits

...

2 Commits

Author SHA1 Message Date
jif-oai
38fba2b18f Fix unified exec output races and aggregation 2025-12-10 11:34:54 +00:00
jif-oai
6779bd5a2d fix: flaky tests 1 2025-12-10 11:13:34 +00:00
2 changed files with 21 additions and 8 deletions

View File

@@ -132,6 +132,7 @@ struct SessionEntry {
cwd: PathBuf,
started_at: tokio::time::Instant,
last_used: tokio::time::Instant,
aggregated_output: String,
}
pub(crate) fn clamp_yield_time(yield_time_ms: u64) -> u64 {

View File

@@ -196,6 +196,8 @@ impl UnifiedExecSessionManager {
)
.await;
self.append_session_output(&request.process_id, &output)
.await;
Self::emit_waiting_status(&context.session, &context.turn, &request.command).await;
};
@@ -259,6 +261,9 @@ impl UnifiedExecSessionManager {
let original_token_count = approx_token_count(&text);
let chunk_id = generate_chunk_id();
self.append_session_output(process_id.as_str(), &output)
.await;
let status = self.refresh_session_state(process_id.as_str()).await;
let (process_id, exit_code, completion_entry, event_call_id) = match status {
SessionStatus::Alive {
@@ -294,8 +299,7 @@ impl UnifiedExecSessionManager {
if let (Some(exit), Some(entry)) = (response.exit_code, completion_entry) {
let total_duration = Instant::now().saturating_duration_since(entry.started_at);
Self::emit_exec_end_from_entry(entry, response.output.clone(), exit, total_duration)
.await;
Self::emit_exec_end_from_entry(entry, exit, total_duration).await;
}
Ok(response)
@@ -388,6 +392,7 @@ impl UnifiedExecSessionManager {
cwd,
started_at,
last_used: started_at,
aggregated_output: String::new(),
};
let number_sessions = {
let mut store = self.session_store.lock().await;
@@ -407,12 +412,19 @@ impl UnifiedExecSessionManager {
};
}
async fn emit_exec_end_from_entry(
entry: SessionEntry,
aggregated_output: String,
exit_code: i32,
duration: Duration,
) {
async fn append_session_output(&self, process_id: &str, output: &str) {
if output.is_empty() {
return;
}
let mut store = self.session_store.lock().await;
if let Some(entry) = store.sessions.get_mut(process_id) {
entry.aggregated_output.push_str(output);
}
}
async fn emit_exec_end_from_entry(entry: SessionEntry, exit_code: i32, duration: Duration) {
let aggregated_output = entry.aggregated_output.clone();
let output = ExecToolCallOutput {
exit_code,
stdout: StreamOutput::new(aggregated_output.clone()),