Compare commits

...

2 Commits

Author SHA1 Message Date
pakrym-oai
3be49da40a core: refactor TurnEvents to use Arc<Session> instead of Sender<Event>
- TurnEvents no longer holds a Sender<Event>, but an Arc<Session>, and events are sent via Session methods.
- Update TurnEvents::new signature and all call sites to pass Arc<Session>.
- Update TurnEvents usage in Session and tests for Arc<Session> changes.
- Rename item_collector.rs to turn_events.rs and update module visibility.
- Remove unused item_collector mod.
- Update debug
2025-10-20 17:51:54 -07:00
pakrym-oai
1a3ab925cb Simplify event publishing 2025-10-20 17:03:18 -07:00
4 changed files with 91 additions and 47 deletions

View File

@@ -10,7 +10,7 @@ use crate::event_mapping::map_response_item_to_event_messages;
use crate::function_tool::FunctionCallError;
use crate::parse_command::parse_command;
use crate::review_format::format_review_findings_block;
use crate::state::ItemCollector;
use crate::state::TurnEvents;
use crate::terminal;
use crate::user_notification::UserNotifier;
use async_channel::Receiver;
@@ -267,7 +267,7 @@ pub(crate) struct TurnContext {
pub(crate) is_review_mode: bool,
pub(crate) final_output_json_schema: Option<Value>,
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
pub(crate) item_collector: ItemCollector,
pub(crate) turn_events: TurnEvents,
}
impl TurnContext {
@@ -351,12 +351,13 @@ pub(crate) struct SessionSettingsUpdate {
impl Session {
fn make_turn_context(
sub_id: String,
auth_manager: Option<Arc<AuthManager>>,
otel_event_manager: &OtelEventManager,
provider: ModelProviderInfo,
session_configuration: &SessionConfiguration,
conversation_id: ConversationId,
tx_event: Sender<Event>,
session: Arc<Session>,
) -> TurnContext {
let config = session_configuration.original_config_do_not_use.clone();
let model_family = find_family_for_model(&session_configuration.model)
@@ -402,7 +403,7 @@ impl Session {
is_review_mode: false,
final_output_json_schema: None,
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
item_collector: ItemCollector::new(tx_event, conversation_id, "turn_id".to_string()),
turn_events: TurnEvents::new(session, conversation_id, sub_id.clone(), sub_id),
}
}
@@ -615,8 +616,11 @@ impl Session {
format!("auto-compact-{id}")
}
async fn record_initial_history(&self, conversation_history: InitialHistory) {
let turn_context = self.new_turn(SessionSettingsUpdate::default()).await;
async fn record_initial_history(self: &Arc<Self>, conversation_history: InitialHistory) {
// TODO(pakrym): Ideally we shouldn't need to create a fake turn context here.
let turn_context = self
.new_turn("init".to_string(), SessionSettingsUpdate::default())
.await;
match conversation_history {
InitialHistory::New => {
// Build and record initial items (user instructions + environment context)
@@ -648,7 +652,11 @@ impl Session {
state.session_configuration = state.session_configuration.apply(&updates);
}
pub(crate) async fn new_turn(&self, updates: SessionSettingsUpdate) -> Arc<TurnContext> {
pub(crate) async fn new_turn(
self: &Arc<Self>,
sub_id: String,
updates: SessionSettingsUpdate,
) -> Arc<TurnContext> {
let session_configuration = {
let mut state = self.state.lock().await;
let session_configuration = state.session_configuration.clone().apply(&updates);
@@ -657,12 +665,13 @@ impl Session {
};
let mut turn_context: TurnContext = Self::make_turn_context(
sub_id,
Some(Arc::clone(&self.services.auth_manager)),
&self.services.otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
self.conversation_id,
self.get_tx_event(),
self.clone(),
);
if let Some(final_schema) = updates.final_output_json_schema {
turn_context.final_output_json_schema = final_schema;
@@ -1140,7 +1149,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
Op::UserInput { items } => (items, SessionSettingsUpdate::default()),
_ => unreachable!(),
};
let current_context = sess.new_turn(updates).await;
let current_context = sess.new_turn(sub.id.clone(), updates).await;
current_context
.client
.get_otel_event_manager()
@@ -1165,7 +1174,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
}
current_context
.item_collector
.turn_events
.started_completed(TurnItem::UserMessage(UserMessageItem::new(&items)))
.await;
@@ -1277,7 +1286,9 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
sess.send_event(event).await;
}
Op::Compact => {
let turn_context = sess.new_turn(SessionSettingsUpdate::default()).await;
let turn_context = sess
.new_turn(sub.id.clone(), SessionSettingsUpdate::default())
.await;
// Attempt to inject input into current task
if let Err(items) = sess
.inject_input(vec![UserInput::Text {
@@ -1347,7 +1358,9 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
sess.send_event(event).await;
}
Op::Review { review_request } => {
let turn_context = sess.new_turn(SessionSettingsUpdate::default()).await;
let turn_context: Arc<TurnContext> = sess
.new_turn(sub.id.clone(), SessionSettingsUpdate::default())
.await;
spawn_review_thread(
sess.clone(),
config.clone(),
@@ -1434,10 +1447,11 @@ async fn spawn_review_thread(
is_review_mode: true,
final_output_json_schema: None,
codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(),
item_collector: ItemCollector::new(
sess.get_tx_event(),
turn_events: TurnEvents::new(
sess.clone(),
sess.conversation_id,
sub_id.to_string(),
sub_id.clone(),
),
};
@@ -2600,7 +2614,7 @@ mod tests {
)
}
pub(crate) fn make_session_and_context() -> (Session, TurnContext) {
pub(crate) fn make_session_and_context() -> (Arc<Session>, TurnContext) {
let (tx_event, _rx_event) = async_channel::unbounded();
let codex_home = tempfile::tempdir().expect("create temp dir");
let config = Config::load_from_base_config_with_overrides(
@@ -2641,15 +2655,6 @@ mod tests {
tool_approvals: Mutex::new(ApprovalStore::default()),
};
let turn_context = Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
tx_event.clone(),
);
let session = Session {
conversation_id,
tx_event,
@@ -2659,6 +2664,18 @@ mod tests {
next_internal_sub_id: AtomicU64::new(0),
};
let session = Arc::new(session);
let turn_context = Session::make_turn_context(
"sub_1".to_string(),
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
session.clone(),
);
(session, turn_context)
}
@@ -2709,15 +2726,6 @@ mod tests {
tool_approvals: Mutex::new(ApprovalStore::default()),
};
let turn_context = Arc::new(Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
tx_event.clone(),
));
let session = Arc::new(Session {
conversation_id,
tx_event,
@@ -2726,6 +2734,16 @@ mod tests {
services,
next_internal_sub_id: AtomicU64::new(0),
});
let session = Arc::clone(&session);
let turn_context = Arc::new(Session::make_turn_context(
"sub_1".to_string(),
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
session.clone(),
));
(session, turn_context, rx_event)
}

View File

@@ -1,11 +1,11 @@
mod item_collector;
mod service;
mod session;
mod turn;
mod turn_events;
pub(crate) use item_collector::ItemCollector;
pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
pub(crate) use turn::RunningTask;
pub(crate) use turn::TaskKind;
pub(crate) use turn_events::TurnEvents;

View File

@@ -1,4 +1,6 @@
use async_channel::Sender;
use core::fmt;
use std::sync::Arc;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::Event;
@@ -7,29 +9,44 @@ use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
use tracing::error;
#[derive(Debug)]
pub(crate) struct ItemCollector {
use crate::codex::Session;
pub(crate) struct TurnEvents {
thread_id: ConversationId,
sub_id: String,
turn_id: String,
tx_event: Sender<Event>,
session: Arc<Session>,
}
impl ItemCollector {
impl fmt::Debug for TurnEvents {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"TurnEvents {{ thread_id: {}, sub_id: {}, turn_id: {} }}",
self.thread_id, self.sub_id, self.turn_id
)
}
}
impl TurnEvents {
pub fn new(
tx_event: Sender<Event>,
session: Arc<Session>,
thread_id: ConversationId,
sub_id: String,
turn_id: String,
) -> ItemCollector {
ItemCollector {
tx_event,
) -> TurnEvents {
TurnEvents {
thread_id,
sub_id,
turn_id,
session,
}
}
pub async fn started(&self, item: TurnItem) {
let err = self
.tx_event
.session
.get_tx_event()
.send(Event {
id: self.turn_id.clone(),
msg: EventMsg::ItemStarted(ItemStartedEvent {
@@ -46,7 +63,8 @@ impl ItemCollector {
pub async fn completed(&self, item: TurnItem) {
let err = self
.tx_event
.session
.get_tx_event()
.send(Event {
id: self.turn_id.clone(),
msg: EventMsg::ItemCompleted(ItemCompletedEvent {
@@ -65,4 +83,12 @@ impl ItemCollector {
self.started(item.clone()).await;
self.completed(item).await;
}
pub async fn legacy(&self, msg: EventMsg) {
let event = Event {
id: self.sub_id.clone(),
msg,
};
self.session.send_event(event).await;
}
}

View File

@@ -86,7 +86,7 @@ mod tests {
let (session, mut turn) = make_session_and_context();
turn.approval_policy = AskForApproval::Never;
turn.sandbox_policy = SandboxPolicy::DangerFullAccess;
(Arc::new(session), Arc::new(turn))
(session, Arc::new(turn))
}
async fn run_unified_exec_request(