mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
V2
This commit is contained in:
5
codex-rs/core/src/state/mod.rs
Normal file
5
codex-rs/core/src/state/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub(crate) mod session;
|
||||
pub(crate) mod turn;
|
||||
|
||||
pub(crate) use session::SessionState;
|
||||
pub(crate) use turn::TurnState;
|
||||
48
codex-rs/core/src/state/session.rs
Normal file
48
codex-rs/core/src/state/session.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_utils_readiness::ReadinessFlag;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::conversation_history::ConversationHistory;
|
||||
use crate::protocol::RateLimitSnapshot;
|
||||
use crate::protocol::ReviewDecision;
|
||||
use crate::protocol::TokenUsageInfo;
|
||||
|
||||
pub(crate) struct SessionState {
|
||||
pub(crate) approved_commands: HashSet<Vec<String>>,
|
||||
pub(crate) pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
|
||||
pub(crate) history: ConversationHistory,
|
||||
pub(crate) token_info: Option<TokenUsageInfo>,
|
||||
pub(crate) latest_rate_limits: Option<RateLimitSnapshot>,
|
||||
readiness_queue: VecDeque<Arc<ReadinessFlag>>,
|
||||
}
|
||||
|
||||
impl SessionState {
|
||||
pub(crate) fn push_readiness(&mut self, flag: Arc<ReadinessFlag>) {
|
||||
self.readiness_queue.push_back(flag);
|
||||
}
|
||||
|
||||
pub(crate) fn next_readiness(&mut self) -> Option<Arc<ReadinessFlag>> {
|
||||
self.readiness_queue.pop_front()
|
||||
}
|
||||
|
||||
pub(crate) fn clear_readiness(&mut self) {
|
||||
self.readiness_queue.clear();
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for SessionState {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
approved_commands: HashSet::new(),
|
||||
pending_approvals: HashMap::new(),
|
||||
history: ConversationHistory::new(),
|
||||
token_info: None,
|
||||
latest_rate_limits: None,
|
||||
readiness_queue: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
120
codex-rs/core/src/state/turn.rs
Normal file
120
codex-rs/core/src/state/turn.rs
Normal file
@@ -0,0 +1,120 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_utils_readiness::ReadinessFlag;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::client::ModelClient;
|
||||
use crate::config_types::ShellEnvironmentPolicy;
|
||||
use crate::openai_tools::ToolsConfig;
|
||||
use crate::protocol::AskForApproval;
|
||||
use crate::protocol::InputItem;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TurnContext {
|
||||
pub(crate) client: ModelClient,
|
||||
/// The session's current working directory. All relative paths provided by
|
||||
/// the model as well as sandbox policies are resolved against this path
|
||||
/// instead of `std::env::current_dir()`.
|
||||
pub(crate) cwd: PathBuf,
|
||||
pub(crate) base_instructions: Option<String>,
|
||||
pub(crate) user_instructions: Option<String>,
|
||||
pub(crate) approval_policy: AskForApproval,
|
||||
pub(crate) sandbox_policy: SandboxPolicy,
|
||||
pub(crate) shell_environment_policy: ShellEnvironmentPolicy,
|
||||
pub(crate) tools_config: ToolsConfig,
|
||||
pub(crate) is_review_mode: bool,
|
||||
pub(crate) final_output_json_schema: Option<Value>,
|
||||
}
|
||||
|
||||
impl TurnContext {
|
||||
pub(crate) fn resolve_path(&self, path: Option<String>) -> PathBuf {
|
||||
path.as_ref()
|
||||
.map(PathBuf::from)
|
||||
.map_or_else(|| self.cwd.clone(), |p| self.cwd.join(p))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TurnMailbox {
|
||||
latest_readiness: Option<Arc<ReadinessFlag>>,
|
||||
pending: VecDeque<ResponseInputItem>,
|
||||
}
|
||||
|
||||
pub(crate) struct TurnState {
|
||||
sub_id: String,
|
||||
turn_context: Arc<TurnContext>,
|
||||
initial_input: Option<ResponseInputItem>,
|
||||
initial_readiness: Option<Arc<ReadinessFlag>>,
|
||||
mailbox: Mutex<TurnMailbox>,
|
||||
}
|
||||
|
||||
impl TurnState {
|
||||
pub(crate) fn new(
|
||||
sub_id: String,
|
||||
turn_context: Arc<TurnContext>,
|
||||
initial_input: Vec<InputItem>,
|
||||
readiness: Option<Arc<ReadinessFlag>>,
|
||||
) -> Self {
|
||||
let initial_input = if initial_input.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(initial_input.into())
|
||||
};
|
||||
|
||||
Self {
|
||||
sub_id,
|
||||
turn_context,
|
||||
initial_input,
|
||||
initial_readiness: readiness,
|
||||
mailbox: Mutex::new(TurnMailbox::default()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn sub_id(&self) -> &str {
|
||||
&self.sub_id
|
||||
}
|
||||
|
||||
pub(crate) fn turn_context(&self) -> Arc<TurnContext> {
|
||||
Arc::clone(&self.turn_context)
|
||||
}
|
||||
|
||||
pub(crate) fn initial_input(&self) -> Option<ResponseInputItem> {
|
||||
self.initial_input.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn initial_readiness(&self) -> Option<Arc<ReadinessFlag>> {
|
||||
self.initial_readiness.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn enqueue_user_input(
|
||||
&self,
|
||||
items: Vec<InputItem>,
|
||||
readiness: Option<Arc<ReadinessFlag>>,
|
||||
) {
|
||||
let mut mailbox = self.mailbox.lock().await;
|
||||
if let Some(flag) = readiness {
|
||||
mailbox.latest_readiness = Some(flag);
|
||||
}
|
||||
if items.is_empty() {
|
||||
return;
|
||||
}
|
||||
let input: ResponseInputItem = items.into();
|
||||
mailbox.pending.push_back(input);
|
||||
}
|
||||
|
||||
pub(crate) async fn drain_mailbox(
|
||||
&self,
|
||||
current: Option<Arc<ReadinessFlag>>,
|
||||
) -> (Vec<ResponseItem>, Option<Arc<ReadinessFlag>>) {
|
||||
let mut mailbox = self.mailbox.lock().await;
|
||||
let readiness = mailbox.latest_readiness.take().or(current);
|
||||
let items = mailbox.pending.drain(..).map(ResponseItem::from).collect();
|
||||
(items, readiness)
|
||||
}
|
||||
}
|
||||
99
codex-rs/state.md
Normal file
99
codex-rs/state.md
Normal file
@@ -0,0 +1,99 @@
|
||||
# TurnState Refactor Plan
|
||||
|
||||
## Motivation
|
||||
- The current `State` inside `core/src/codex.rs` mixes long-lived session data (e.g. history, approved commands) with per-turn information (`pending_input`, transient readiness flags).
|
||||
- Readiness is delivered over a side channel and threaded through many call sites as an `Option<Arc<ReadinessFlag>>`, making it hard to reason about when the flag is consumed, reused, or replaced.
|
||||
- Follow-up user messages while a turn is streaming are pushed into a session-level vector; the lifecycle for those items is implicit inside `run_task`.
|
||||
- Introducing an explicit `TurnState` object lets us capture everything that belongs to one user turn, ensures it is dropped when the turn finishes, and gives us a single place to own the readiness flag.
|
||||
|
||||
## Proposed data structures
|
||||
|
||||
### `TurnState`
|
||||
```rust
|
||||
pub(crate) struct TurnState {
|
||||
/// Submission id that started this turn.
|
||||
pub sub_id: String,
|
||||
/// Per-turn context (model overrides, cwd, schema…).
|
||||
pub turn_context: Arc<TurnContext>,
|
||||
/// The initial user input bundled for the model.
|
||||
pub initial_input: ResponseInputItem,
|
||||
/// Mailbox for follow-up user inputs and readiness handoffs.
|
||||
mailbox: Mutex<TurnMailbox>,
|
||||
/// Tracks the latest agent output / review artefacts for completion events.
|
||||
last_agent_message: Mutex<Option<String>>,
|
||||
/// When in review mode we keep an isolated history to feed the child model.
|
||||
review_thread_history: Mutex<Vec<ResponseItem>>,
|
||||
/// Tracks the diff across the whole turn so we can emit `TurnDiff` events once.
|
||||
diff_tracker: Mutex<TurnDiffTracker>,
|
||||
/// Whether we already tried auto-compaction in this turn.
|
||||
auto_compact_recently_attempted: AtomicBool,
|
||||
}
|
||||
```
|
||||
|
||||
`TurnMailbox` is a helper that keeps the queue of pending turn inputs and the most recent readiness flag:
|
||||
```rust
|
||||
struct TurnMailbox {
|
||||
latest_readiness: Option<Arc<ReadinessFlag>>,
|
||||
pending: VecDeque<PendingTurnInput>,
|
||||
}
|
||||
```
|
||||
|
||||
`PendingTurnInput` keeps the shape it already has today (`ResponseInputItem` plus the readiness flag that was active when it was enqueued).
|
||||
|
||||
### Handle vs. runtime
|
||||
- `TurnState` is reference-counted (`Arc<TurnState>`) so both the session (for injection) and the running task can access it.
|
||||
- Runtime-only helpers (prompt building, retry counters) remain inside `run_task`; they borrow data from `TurnState` instead of keeping their own copies.
|
||||
|
||||
## Lifecycle management
|
||||
1. **Creation** – When `submission_loop` receives `Op::UserInput` or `Op::UserTurn` and there is no active task, it constructs a `TurnState`:
|
||||
- Build/resolve the `TurnContext` (either reuse the persistent one or apply the per-turn overrides).
|
||||
- Collect the initial readiness flag by peeking at the readiness receiver. Instead of losing it on `try_recv` failure we push the flag into a queue owned by the session; `TurnState::new` pops from that queue. If nothing is available we store `None` and the flag defaults to ready semantics.
|
||||
- Convert the submitted items into a `ResponseInputItem` and seed the mailbox with that entry. The same `TurnState::enqueue_initial_input` helper is used for review threads so every task goes through the same path.
|
||||
- Wrap the whole struct in an `Arc` and pass it to `AgentTask::spawn` (or `.review`).
|
||||
|
||||
2. **Session bookkeeping** – `State` gains two fields:
|
||||
```rust
|
||||
current_task: Option<AgentTask>,
|
||||
current_turn: Option<Arc<TurnState>>,
|
||||
```
|
||||
`Session::set_task` stores both, aborting the previous task if needed. `Session::remove_task` clears `current_turn` in addition to `current_task`.
|
||||
|
||||
3. **Injecting more user input** – `Session::inject_input` becomes a thin wrapper:
|
||||
- Grab the session mutex.
|
||||
- If there is an active `TurnState`, call `turn_state.enqueue_user_input(items, readiness)` and return `Ok(())`.
|
||||
- If not, return `Err((items, readiness))` so the submission loop knows it needs to start a fresh turn (same behaviour as today).
|
||||
The enqueue helper converts the new items into a `PendingTurnInput`, pushes it into the mailbox, and updates `latest_readiness` when a flag accompanies the message.
|
||||
|
||||
4. **Turn execution** – `run_task` now receives `Arc<TurnState>` instead of a raw `Vec<InputItem>` / readiness pair. It:
|
||||
- Grabs the initial input via `turn_state.take_initial_input()` to seed history and the review mailbox.
|
||||
- On each iteration, calls `turn_state.drain_mailbox()` which returns `(Vec<ResponseItem>, Option<Arc<ReadinessFlag>>)` so the loop no longer needs to manipulate the readiness flag manually. `TurnMailbox` ensures we always hand out the most recent readiness flag (the newest non-`None` entry wins).
|
||||
- Accesses the diff tracker, review history, and auto-compaction flag through the `TurnState` rather than local variables. This keeps the single source of truth tied to the turn’s lifetime and makes debugging easier.
|
||||
- Writes the last assistant message into `turn_state` before signalling `TaskComplete` so listeners can retrieve it even if the task is aborted elsewhere.
|
||||
|
||||
5. **Completion** – When the loop finishes (success, interruption, or error) we drop `Arc<TurnState>` by clearing `current_turn`. All readiness waiters associated with the turn naturally drop because the only owner lives on the turn state.
|
||||
|
||||
## Readiness handling
|
||||
- `TurnReadinessBridge` in the TUI continues to send `Arc<ReadinessFlag>` values over the readiness channel; the session stores them in a short queue (`VecDeque<Arc<ReadinessFlag>>`) protected by the same mutex that guards `State`.
|
||||
- `TurnState::new` pops the next flag when constructing the mailbox. If the queue is empty we log (with rate limiting) and store `None` so the turn stays unblocked.
|
||||
- `TurnState::enqueue_user_input` accepts an optional flag. When present we update `latest_readiness` before pushing the input so subsequent `drain_mailbox` calls hand the new flag to `run_turn`.
|
||||
- `run_turn` and `handle_response_item` only see `turn_state.current_readiness()`, eliminating the need for an ad-hoc `current_turn_readiness` variable scattered through the loop.
|
||||
- Because the readiness flag lives on the `TurnState`, tool handlers that are spawned outside the loop (e.g. background exec streams) can clone the flag from the turn state if they need to delay until the user confirms.
|
||||
|
||||
## Changes to submission loop
|
||||
- Replace the existing `turn_readiness_rx.try_recv()` calls with a helper on the session such as `Session::next_turn_readiness()` that returns the oldest queued flag (or `None`). `TurnState::new` receives that value and stores it in its mailbox.
|
||||
- The submission loop no longer passes readiness into `AgentTask::spawn`; instead it constructs the `TurnState` (with readiness embedded) and hands the state to the task constructor.
|
||||
- For review turns and compaction tasks, we construct a `TurnState` with `None` readiness. The helper works for both flows so we can remove the separate code paths that bypass readiness today.
|
||||
|
||||
## Implementation plan
|
||||
1. Introduce the `turn_state` module with `TurnState`, `TurnMailbox`, and helpers to enqueue / drain inputs and expose readiness.
|
||||
2. Extend `State` with `current_turn` and a `VecDeque<Arc<ReadinessFlag>>` used to store unread readiness flags pushed by the UI.
|
||||
3. Update the readiness sender plumbing so `Codex::turn_readiness_sender()` pushes into that queue; remove the direct `try_recv` usage.
|
||||
4. Refactor `AgentTask::spawn` / `run_task` to accept `Arc<TurnState>` and use the new helper methods for initial input, pending input, diff tracking, and readiness.
|
||||
5. Simplify `Session::inject_input` to route through the active `TurnState` instead of manipulating `state.pending_input` directly. Drop the `PendingTurnInput` vector from `State` once all call sites are migrated.
|
||||
6. Move per-turn temporaries (`last_agent_message`, review mailbox, diff tracker, auto-compact flag) into `TurnState`; this lets us delete the bespoke locals in `run_task` and make the turn lifecycle self-contained.
|
||||
7. After the refactor, audit call sites to ensure readiness is consistently fetched from the turn state, delete the now-unused `turn_readiness` parameters, and clean up warnings.
|
||||
|
||||
## Follow-up considerations
|
||||
- With `TurnState` owning the readiness flag we can extend it later to expose richer readiness semantics (e.g. multiple tokens, logging) without touching the submission loop again.
|
||||
- This refactor lays the groundwork for queuing multiple `TurnState`s if we later want to support full multiturn buffering instead of mutating the live turn.
|
||||
- Once `TurnState` is in place, the session-level mutex guards much less data, which could be split further if concurrency becomes a bottleneck.
|
||||
Reference in New Issue
Block a user