mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
Reuse shared state DB for agent resume
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -298,10 +298,16 @@ impl AgentControl {
|
||||
config: crate::config::Config,
|
||||
thread_id: ThreadId,
|
||||
session_source: SessionSource,
|
||||
state_db_ctx: Option<state_db::StateDbHandle>,
|
||||
) -> CodexResult<ThreadId> {
|
||||
let root_depth = thread_spawn_depth(&session_source).unwrap_or(0);
|
||||
let resumed_thread_id = self
|
||||
.resume_single_agent_from_rollout(config.clone(), thread_id, session_source)
|
||||
.resume_single_agent_from_rollout(
|
||||
config.clone(),
|
||||
thread_id,
|
||||
session_source,
|
||||
state_db_ctx.clone(),
|
||||
)
|
||||
.await?;
|
||||
let state = self.upgrade()?;
|
||||
let Ok(resumed_thread) = state.get_thread(resumed_thread_id).await else {
|
||||
@@ -347,6 +353,7 @@ impl AgentControl {
|
||||
config.clone(),
|
||||
child_thread_id,
|
||||
child_session_source,
|
||||
Some(state_db_ctx.clone()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -371,6 +378,7 @@ impl AgentControl {
|
||||
mut config: crate::config::Config,
|
||||
thread_id: ThreadId,
|
||||
session_source: SessionSource,
|
||||
state_db_ctx: Option<state_db::StateDbHandle>,
|
||||
) -> CodexResult<ThreadId> {
|
||||
if let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) = &session_source
|
||||
&& *depth >= config.agent_max_depth
|
||||
@@ -380,7 +388,10 @@ impl AgentControl {
|
||||
}
|
||||
let state = self.upgrade()?;
|
||||
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
||||
let state_db_ctx = state_db::get_state_db(&config).await;
|
||||
let state_db_ctx = match state_db_ctx {
|
||||
Some(state_db_ctx) => Some(state_db_ctx),
|
||||
None => state_db::get_state_db(&config).await,
|
||||
};
|
||||
let (session_source, agent_metadata) = match session_source {
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
|
||||
@@ -298,7 +298,12 @@ async fn resume_agent_errors_when_manager_dropped() {
|
||||
let control = AgentControl::default();
|
||||
let (_home, config) = test_config().await;
|
||||
let err = control
|
||||
.resume_agent_from_rollout(config, ThreadId::new(), SessionSource::Exec)
|
||||
.resume_agent_from_rollout(
|
||||
config,
|
||||
ThreadId::new(),
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect_err("resume_agent should fail without a manager");
|
||||
assert_eq!(
|
||||
@@ -942,7 +947,12 @@ async fn resume_agent_respects_max_threads_limit() {
|
||||
.expect("spawn_agent should succeed for active slot");
|
||||
|
||||
let err = control
|
||||
.resume_agent_from_rollout(config, resumable_id, SessionSource::Exec)
|
||||
.resume_agent_from_rollout(
|
||||
config,
|
||||
resumable_id,
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect_err("resume should respect max threads");
|
||||
let CodexErr::AgentLimitReached {
|
||||
@@ -975,7 +985,12 @@ async fn resume_agent_releases_slot_after_resume_failure() {
|
||||
let control = manager.agent_control();
|
||||
|
||||
let _ = control
|
||||
.resume_agent_from_rollout(config.clone(), ThreadId::new(), SessionSource::Exec)
|
||||
.resume_agent_from_rollout(
|
||||
config.clone(),
|
||||
ThreadId::new(),
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect_err("resume should fail for missing rollout path");
|
||||
|
||||
@@ -1454,6 +1469,7 @@ async fn resume_thread_subagent_restores_stored_nickname_and_role() {
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("resume should succeed");
|
||||
@@ -1540,7 +1556,12 @@ async fn resume_agent_from_rollout_reads_archived_rollout_path() {
|
||||
|
||||
let resumed_thread_id = harness
|
||||
.control
|
||||
.resume_agent_from_rollout(harness.config.clone(), child_thread_id, SessionSource::Exec)
|
||||
.resume_agent_from_rollout(
|
||||
harness.config.clone(),
|
||||
child_thread_id,
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("resume should find archived rollout");
|
||||
assert_eq!(resumed_thread_id, child_thread_id);
|
||||
@@ -1799,6 +1820,7 @@ async fn resume_agent_from_rollout_does_not_reopen_closed_descendants() {
|
||||
harness.config.clone(),
|
||||
parent_thread_id,
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("single-thread resume should succeed");
|
||||
@@ -1895,6 +1917,7 @@ async fn resume_closed_child_reopens_open_descendants() {
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("child resume should succeed");
|
||||
@@ -1987,6 +2010,7 @@ async fn resume_agent_from_rollout_reopens_open_descendants_after_manager_shutdo
|
||||
harness.config.clone(),
|
||||
parent_thread_id,
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("tree resume should succeed");
|
||||
@@ -2100,6 +2124,7 @@ async fn resume_agent_from_rollout_uses_edge_data_when_descendant_metadata_sourc
|
||||
harness.config.clone(),
|
||||
parent_thread_id,
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("tree resume should succeed");
|
||||
@@ -2215,6 +2240,7 @@ async fn resume_agent_from_rollout_skips_descendants_when_parent_resume_fails()
|
||||
harness.config.clone(),
|
||||
parent_thread_id,
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("root resume should succeed");
|
||||
|
||||
@@ -163,6 +163,7 @@ async fn try_resume_closed_agent(
|
||||
/*agent_role*/ None,
|
||||
/*task_name*/ None,
|
||||
)?,
|
||||
session.state_db(),
|
||||
)
|
||||
.await
|
||||
.map(|_| ())
|
||||
|
||||
Reference in New Issue
Block a user