mirror of
https://github.com/openai/codex.git
synced 2026-02-02 15:03:38 +00:00
Compare commits
15 Commits
dh--reenab
...
input-vali
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
257d5eceff | ||
|
|
56dfa60801 | ||
|
|
50055c7ed5 | ||
|
|
89e9bb1d42 | ||
|
|
81d0003f0f | ||
|
|
5d7983520b | ||
|
|
4e7089a8ab | ||
|
|
697367cd3f | ||
|
|
6ffbd0d4e3 | ||
|
|
79c628a823 | ||
|
|
9446de0923 | ||
|
|
0f02954edb | ||
|
|
7a8da22d7e | ||
|
|
8f8ca17da0 | ||
|
|
8b095d3cf1 |
@@ -59,6 +59,7 @@ use crate::config::Config;
|
||||
use crate::config_types::McpServerTransportConfig;
|
||||
use crate::config_types::ShellEnvironmentPolicy;
|
||||
use crate::conversation_history::ConversationHistory;
|
||||
use crate::conversation_history::prefetch_tokenizer_in_background;
|
||||
use crate::environment_context::EnvironmentContext;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
@@ -159,6 +160,8 @@ impl Codex {
|
||||
conversation_history: InitialHistory,
|
||||
session_source: SessionSource,
|
||||
) -> CodexResult<CodexSpawnOk> {
|
||||
// Start loading the tokenizer in the background so we don't block later.
|
||||
prefetch_tokenizer_in_background();
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (tx_event, rx_event) = async_channel::unbounded();
|
||||
|
||||
@@ -568,7 +571,9 @@ impl Session {
|
||||
// Dispatch the SessionConfiguredEvent first and then report any errors.
|
||||
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
|
||||
let initial_messages = initial_history.get_event_msgs();
|
||||
sess.record_initial_history(initial_history).await;
|
||||
sess.record_initial_history(initial_history)
|
||||
.await
|
||||
.map_err(anyhow::Error::new)?;
|
||||
|
||||
let events = std::iter::once(Event {
|
||||
id: INITIAL_SUBMIT_ID.to_owned(),
|
||||
@@ -601,13 +606,16 @@ impl Session {
|
||||
format!("auto-compact-{id}")
|
||||
}
|
||||
|
||||
async fn record_initial_history(&self, conversation_history: InitialHistory) {
|
||||
async fn record_initial_history(
|
||||
&self,
|
||||
conversation_history: InitialHistory,
|
||||
) -> CodexResult<()> {
|
||||
let turn_context = self.new_turn(SessionSettingsUpdate::default()).await;
|
||||
match conversation_history {
|
||||
InitialHistory::New => {
|
||||
// Build and record initial items (user instructions + environment context)
|
||||
let items = self.build_initial_context(&turn_context);
|
||||
self.record_conversation_items(&items).await;
|
||||
self.record_conversation_items(&items).await?;
|
||||
}
|
||||
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
|
||||
let rollout_items = conversation_history.get_rollout_items();
|
||||
@@ -615,9 +623,9 @@ impl Session {
|
||||
|
||||
// Always add response items to conversation history
|
||||
let reconstructed_history =
|
||||
self.reconstruct_history_from_rollout(&turn_context, &rollout_items);
|
||||
self.reconstruct_history_from_rollout(&turn_context, &rollout_items)?;
|
||||
if !reconstructed_history.is_empty() {
|
||||
self.record_into_history(&reconstructed_history).await;
|
||||
self.record_into_history(&reconstructed_history).await?;
|
||||
}
|
||||
|
||||
// If persisting, persist all rollout items as-is (recorder filters)
|
||||
@@ -626,6 +634,7 @@ impl Session {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn update_settings(&self, updates: SessionSettingsUpdate) {
|
||||
@@ -856,21 +865,25 @@ impl Session {
|
||||
|
||||
/// Records input items: always append to conversation history and
|
||||
/// persist these response items to rollout.
|
||||
pub(crate) async fn record_conversation_items(&self, items: &[ResponseItem]) {
|
||||
self.record_into_history(items).await;
|
||||
pub(crate) async fn record_conversation_items(
|
||||
&self,
|
||||
items: &[ResponseItem],
|
||||
) -> CodexResult<()> {
|
||||
self.record_into_history(items).await?;
|
||||
self.persist_rollout_response_items(items).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn reconstruct_history_from_rollout(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> Vec<ResponseItem> {
|
||||
) -> CodexResult<Vec<ResponseItem>> {
|
||||
let mut history = ConversationHistory::new();
|
||||
for item in rollout_items {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(response_item) => {
|
||||
history.record_items(std::iter::once(response_item));
|
||||
history.record_items(std::iter::once(response_item))?;
|
||||
}
|
||||
RolloutItem::Compacted(compacted) => {
|
||||
let snapshot = history.get_history();
|
||||
@@ -885,13 +898,14 @@ impl Session {
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
history.get_history()
|
||||
Ok(history.get_history())
|
||||
}
|
||||
|
||||
/// Append ResponseItems to the in-memory conversation history only.
|
||||
async fn record_into_history(&self, items: &[ResponseItem]) {
|
||||
async fn record_into_history(&self, items: &[ResponseItem]) -> CodexResult<()> {
|
||||
let mut state = self.state.lock().await;
|
||||
state.record_items(items.iter());
|
||||
state.record_items(items.iter())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn replace_history(&self, items: Vec<ResponseItem>) {
|
||||
@@ -1000,11 +1014,11 @@ impl Session {
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
response_input: &ResponseInputItem,
|
||||
) {
|
||||
) -> CodexResult<()> {
|
||||
let response_item: ResponseItem = response_input.clone().into();
|
||||
// Add to conversation history and persist response item to rollout
|
||||
self.record_conversation_items(std::slice::from_ref(&response_item))
|
||||
.await;
|
||||
.await?;
|
||||
|
||||
// Derive user message events and persist only UserMessage to rollout
|
||||
let turn_item = parse_turn_item(&response_item);
|
||||
@@ -1013,6 +1027,7 @@ impl Session {
|
||||
self.emit_turn_item_started_completed(turn_context, item, false)
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper that emits a BackgroundEvent with the given message. This keeps
|
||||
@@ -1193,9 +1208,17 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
if let Err(items) = sess.inject_input(items).await {
|
||||
if let Some(env_item) = sess
|
||||
.build_environment_update_item(previous_context.as_ref(), ¤t_context)
|
||||
&& let Err(err) = sess
|
||||
.record_conversation_items(std::slice::from_ref(&env_item))
|
||||
.await
|
||||
{
|
||||
sess.record_conversation_items(std::slice::from_ref(&env_item))
|
||||
.await;
|
||||
sess.send_event(
|
||||
current_context.as_ref(),
|
||||
EventMsg::Error(ErrorEvent {
|
||||
message: err.to_string(),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
sess.spawn_task(Arc::clone(¤t_context), items, RegularTask)
|
||||
@@ -1508,9 +1531,9 @@ pub(crate) async fn run_task(
|
||||
input: Vec<UserInput>,
|
||||
task_kind: TaskKind,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> CodexResult<Option<String>> {
|
||||
if input.is_empty() {
|
||||
return None;
|
||||
return Ok(None);
|
||||
}
|
||||
let event = EventMsg::TaskStarted(TaskStartedEvent {
|
||||
model_context_window: turn_context.client.get_model_context_window(),
|
||||
@@ -1527,11 +1550,11 @@ pub(crate) async fn run_task(
|
||||
if is_review_mode {
|
||||
// Seed review threads with environment context so the model knows the working directory.
|
||||
review_thread_history
|
||||
.record_items(sess.build_initial_context(turn_context.as_ref()).iter());
|
||||
review_thread_history.record_items(std::iter::once(&initial_input_for_turn.into()));
|
||||
.record_items(sess.build_initial_context(turn_context.as_ref()).iter())?;
|
||||
review_thread_history.record_items(std::iter::once(&initial_input_for_turn.into()))?;
|
||||
} else {
|
||||
sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn)
|
||||
.await;
|
||||
.await?;
|
||||
}
|
||||
|
||||
let mut last_agent_message: Option<String> = None;
|
||||
@@ -1563,11 +1586,11 @@ pub(crate) async fn run_task(
|
||||
// represents an append-only log without duplicates.
|
||||
let turn_input: Vec<ResponseItem> = if is_review_mode {
|
||||
if !pending_input.is_empty() {
|
||||
review_thread_history.record_items(&pending_input);
|
||||
review_thread_history.record_items(&pending_input)?;
|
||||
}
|
||||
review_thread_history.get_history()
|
||||
} else {
|
||||
sess.record_conversation_items(&pending_input).await;
|
||||
sess.record_conversation_items(&pending_input).await?;
|
||||
sess.history_snapshot().await
|
||||
};
|
||||
|
||||
@@ -1615,7 +1638,7 @@ pub(crate) async fn run_task(
|
||||
&mut review_thread_history,
|
||||
&sess,
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
|
||||
if token_limit_reached {
|
||||
if auto_compact_recently_attempted {
|
||||
@@ -1632,7 +1655,8 @@ pub(crate) async fn run_task(
|
||||
break;
|
||||
}
|
||||
auto_compact_recently_attempted = true;
|
||||
compact::run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await;
|
||||
compact::run_inline_auto_compact_task(sess.clone(), turn_context.clone())
|
||||
.await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -1657,13 +1681,13 @@ pub(crate) async fn run_task(
|
||||
Err(CodexErr::TurnAborted {
|
||||
dangling_artifacts: processed_items,
|
||||
}) => {
|
||||
let _ = process_items(
|
||||
process_items(
|
||||
processed_items,
|
||||
is_review_mode,
|
||||
&mut review_thread_history,
|
||||
&sess,
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
// Aborted turn is reported via a different event.
|
||||
break;
|
||||
}
|
||||
@@ -1692,10 +1716,10 @@ pub(crate) async fn run_task(
|
||||
Arc::clone(&turn_context),
|
||||
last_agent_message.as_deref().map(parse_review_output_event),
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
}
|
||||
|
||||
last_agent_message
|
||||
Ok(last_agent_message)
|
||||
}
|
||||
|
||||
/// Parse the review output; when not valid JSON, build a structured
|
||||
@@ -2134,7 +2158,7 @@ pub(crate) async fn exit_review_mode(
|
||||
session: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
review_output: Option<ReviewOutputEvent>,
|
||||
) {
|
||||
) -> CodexResult<()> {
|
||||
let event = EventMsg::ExitedReviewMode(ExitedReviewModeEvent {
|
||||
review_output: review_output.clone(),
|
||||
});
|
||||
@@ -2177,7 +2201,8 @@ pub(crate) async fn exit_review_mode(
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText { text: user_message }],
|
||||
}])
|
||||
.await;
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn mcp_init_error_display(
|
||||
@@ -2225,6 +2250,7 @@ mod tests {
|
||||
use crate::config::ConfigToml;
|
||||
use crate::config_types::McpServerConfig;
|
||||
use crate::config_types::McpServerTransportConfig;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::mcp::auth::McpAuthStatusEntry;
|
||||
use crate::tools::format_exec_output_str;
|
||||
@@ -2265,9 +2291,12 @@ mod tests {
|
||||
#[test]
|
||||
fn reconstruct_history_matches_live_compactions() {
|
||||
let (session, turn_context) = make_session_and_context();
|
||||
let (rollout_items, expected) = sample_rollout(&session, &turn_context);
|
||||
let (rollout_items, expected) =
|
||||
sample_rollout(&session, &turn_context).expect("sample rollout");
|
||||
|
||||
let reconstructed = session.reconstruct_history_from_rollout(&turn_context, &rollout_items);
|
||||
let reconstructed = session
|
||||
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
|
||||
.expect("reconstruct history");
|
||||
|
||||
assert_eq!(expected, reconstructed);
|
||||
}
|
||||
@@ -2275,15 +2304,19 @@ mod tests {
|
||||
#[test]
|
||||
fn record_initial_history_reconstructs_resumed_transcript() {
|
||||
let (session, turn_context) = make_session_and_context();
|
||||
let (rollout_items, expected) = sample_rollout(&session, &turn_context);
|
||||
let (rollout_items, expected) =
|
||||
sample_rollout(&session, &turn_context).expect("sample rollout");
|
||||
|
||||
tokio_test::block_on(session.record_initial_history(InitialHistory::Resumed(
|
||||
ResumedHistory {
|
||||
conversation_id: ConversationId::default(),
|
||||
history: rollout_items,
|
||||
rollout_path: PathBuf::from("/tmp/resume.jsonl"),
|
||||
},
|
||||
)));
|
||||
tokio_test::block_on(async {
|
||||
session
|
||||
.record_initial_history(InitialHistory::Resumed(ResumedHistory {
|
||||
conversation_id: ConversationId::default(),
|
||||
history: rollout_items,
|
||||
rollout_path: PathBuf::from("/tmp/resume.jsonl"),
|
||||
}))
|
||||
.await
|
||||
.expect("record resumed history");
|
||||
});
|
||||
|
||||
let actual = tokio_test::block_on(async { session.state.lock().await.history_snapshot() });
|
||||
assert_eq!(expected, actual);
|
||||
@@ -2292,9 +2325,15 @@ mod tests {
|
||||
#[test]
|
||||
fn record_initial_history_reconstructs_forked_transcript() {
|
||||
let (session, turn_context) = make_session_and_context();
|
||||
let (rollout_items, expected) = sample_rollout(&session, &turn_context);
|
||||
let (rollout_items, expected) =
|
||||
sample_rollout(&session, &turn_context).expect("sample rollout");
|
||||
|
||||
tokio_test::block_on(session.record_initial_history(InitialHistory::Forked(rollout_items)));
|
||||
tokio_test::block_on(async {
|
||||
session
|
||||
.record_initial_history(InitialHistory::Forked(rollout_items))
|
||||
.await
|
||||
.expect("record forked history");
|
||||
});
|
||||
|
||||
let actual = tokio_test::block_on(async { session.state.lock().await.history_snapshot() });
|
||||
assert_eq!(expected, actual);
|
||||
@@ -2654,10 +2693,10 @@ mod tests {
|
||||
_ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> CodexResult<Option<String>> {
|
||||
if self.listen_to_cancellation_token {
|
||||
cancellation_token.cancelled().await;
|
||||
return None;
|
||||
return Ok(None);
|
||||
}
|
||||
loop {
|
||||
sleep(Duration::from_secs(60)).await;
|
||||
@@ -2666,7 +2705,7 @@ mod tests {
|
||||
|
||||
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
|
||||
if let TaskKind::Review = self.kind {
|
||||
exit_review_mode(session.clone_session(), ctx, None).await;
|
||||
let _ = exit_review_mode(session.clone_session(), ctx, None).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2822,7 +2861,7 @@ mod tests {
|
||||
fn sample_rollout(
|
||||
session: &Session,
|
||||
turn_context: &TurnContext,
|
||||
) -> (Vec<RolloutItem>, Vec<ResponseItem>) {
|
||||
) -> CodexResult<(Vec<RolloutItem>, Vec<ResponseItem>)> {
|
||||
let mut rollout_items = Vec::new();
|
||||
let mut live_history = ConversationHistory::new();
|
||||
|
||||
@@ -2830,7 +2869,7 @@ mod tests {
|
||||
for item in &initial_context {
|
||||
rollout_items.push(RolloutItem::ResponseItem(item.clone()));
|
||||
}
|
||||
live_history.record_items(initial_context.iter());
|
||||
live_history.record_items(initial_context.iter())?;
|
||||
|
||||
let user1 = ResponseItem::Message {
|
||||
id: None,
|
||||
@@ -2839,7 +2878,7 @@ mod tests {
|
||||
text: "first user".to_string(),
|
||||
}],
|
||||
};
|
||||
live_history.record_items(std::iter::once(&user1));
|
||||
live_history.record_items(std::iter::once(&user1))?;
|
||||
rollout_items.push(RolloutItem::ResponseItem(user1.clone()));
|
||||
|
||||
let assistant1 = ResponseItem::Message {
|
||||
@@ -2849,7 +2888,7 @@ mod tests {
|
||||
text: "assistant reply one".to_string(),
|
||||
}],
|
||||
};
|
||||
live_history.record_items(std::iter::once(&assistant1));
|
||||
live_history.record_items(std::iter::once(&assistant1))?;
|
||||
rollout_items.push(RolloutItem::ResponseItem(assistant1.clone()));
|
||||
|
||||
let summary1 = "summary one";
|
||||
@@ -2872,7 +2911,7 @@ mod tests {
|
||||
text: "second user".to_string(),
|
||||
}],
|
||||
};
|
||||
live_history.record_items(std::iter::once(&user2));
|
||||
live_history.record_items(std::iter::once(&user2))?;
|
||||
rollout_items.push(RolloutItem::ResponseItem(user2.clone()));
|
||||
|
||||
let assistant2 = ResponseItem::Message {
|
||||
@@ -2882,7 +2921,7 @@ mod tests {
|
||||
text: "assistant reply two".to_string(),
|
||||
}],
|
||||
};
|
||||
live_history.record_items(std::iter::once(&assistant2));
|
||||
live_history.record_items(std::iter::once(&assistant2))?;
|
||||
rollout_items.push(RolloutItem::ResponseItem(assistant2.clone()));
|
||||
|
||||
let summary2 = "summary two";
|
||||
@@ -2905,7 +2944,7 @@ mod tests {
|
||||
text: "third user".to_string(),
|
||||
}],
|
||||
};
|
||||
live_history.record_items(std::iter::once(&user3));
|
||||
live_history.record_items(std::iter::once(&user3))?;
|
||||
rollout_items.push(RolloutItem::ResponseItem(user3.clone()));
|
||||
|
||||
let assistant3 = ResponseItem::Message {
|
||||
@@ -2915,10 +2954,10 @@ mod tests {
|
||||
text: "assistant reply three".to_string(),
|
||||
}],
|
||||
};
|
||||
live_history.record_items(std::iter::once(&assistant3));
|
||||
live_history.record_items(std::iter::once(&assistant3))?;
|
||||
rollout_items.push(RolloutItem::ResponseItem(assistant3.clone()));
|
||||
|
||||
(rollout_items, live_history.get_history())
|
||||
Ok((rollout_items, live_history.get_history()))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -39,35 +39,35 @@ struct HistoryBridgeTemplate<'a> {
|
||||
pub(crate) async fn run_inline_auto_compact_task(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
) {
|
||||
) -> CodexResult<()> {
|
||||
let input = vec![UserInput::Text {
|
||||
text: SUMMARIZATION_PROMPT.to_string(),
|
||||
}];
|
||||
run_compact_task_inner(sess, turn_context, input).await;
|
||||
run_compact_task_inner(sess, turn_context, input).await
|
||||
}
|
||||
|
||||
pub(crate) async fn run_compact_task(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
) -> Option<String> {
|
||||
) -> CodexResult<Option<String>> {
|
||||
let start_event = EventMsg::TaskStarted(TaskStartedEvent {
|
||||
model_context_window: turn_context.client.get_model_context_window(),
|
||||
});
|
||||
sess.send_event(&turn_context, start_event).await;
|
||||
run_compact_task_inner(sess.clone(), turn_context, input).await;
|
||||
None
|
||||
run_compact_task_inner(sess.clone(), turn_context, input).await?;
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn run_compact_task_inner(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
) {
|
||||
) -> CodexResult<()> {
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||
|
||||
let mut history = sess.clone_history().await;
|
||||
history.record_items(&[initial_input_for_turn.into()]);
|
||||
history.record_items(&[initial_input_for_turn.into()])?;
|
||||
|
||||
let mut truncated_count = 0usize;
|
||||
|
||||
@@ -106,7 +106,7 @@ async fn run_compact_task_inner(
|
||||
break;
|
||||
}
|
||||
Err(CodexErr::Interrupted) => {
|
||||
return;
|
||||
return Ok(());
|
||||
}
|
||||
Err(e @ CodexErr::ContextWindowExceeded) => {
|
||||
if turn_input.len() > 1 {
|
||||
@@ -124,7 +124,7 @@ async fn run_compact_task_inner(
|
||||
message: e.to_string(),
|
||||
});
|
||||
sess.send_event(&turn_context, event).await;
|
||||
return;
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => {
|
||||
if retries < max_retries {
|
||||
@@ -142,7 +142,7 @@ async fn run_compact_task_inner(
|
||||
message: e.to_string(),
|
||||
});
|
||||
sess.send_event(&turn_context, event).await;
|
||||
return;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -164,6 +164,7 @@ async fn run_compact_task_inner(
|
||||
message: "Compact task completed".to_string(),
|
||||
});
|
||||
sess.send_event(&turn_context, event).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {
|
||||
@@ -252,7 +253,8 @@ async fn drain_to_completed(
|
||||
};
|
||||
match event {
|
||||
Ok(ResponseEvent::OutputItemDone(item)) => {
|
||||
sess.record_into_history(std::slice::from_ref(&item)).await;
|
||||
sess.record_into_history(std::slice::from_ref(&item))
|
||||
.await?;
|
||||
}
|
||||
Ok(ResponseEvent::RateLimits(snapshot)) => {
|
||||
sess.update_rate_limits(turn_context, snapshot).await;
|
||||
|
||||
@@ -1,11 +1,20 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo;
|
||||
use codex_utils_tokenizer::Tokenizer;
|
||||
use tokio::task;
|
||||
use tracing::error;
|
||||
|
||||
static TOKENIZER: OnceLock<Option<Arc<Tokenizer>>> = OnceLock::new();
|
||||
|
||||
use crate::error::CodexErr;
|
||||
|
||||
/// Transcript of conversation history
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ConversationHistory {
|
||||
/// The oldest items are at the beginning of the vector.
|
||||
items: Vec<ResponseItem>,
|
||||
@@ -16,7 +25,7 @@ impl ConversationHistory {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
items: Vec::new(),
|
||||
token_info: TokenUsageInfo::new_or_append(&None, &None, None),
|
||||
token_info: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,7 +43,7 @@ impl ConversationHistory {
|
||||
}
|
||||
|
||||
/// `items` is ordered from oldest to newest.
|
||||
pub(crate) fn record_items<I>(&mut self, items: I)
|
||||
pub(crate) fn record_items<I>(&mut self, items: I) -> Result<(), CodexErr>
|
||||
where
|
||||
I: IntoIterator,
|
||||
I::Item: std::ops::Deref<Target = ResponseItem>,
|
||||
@@ -43,9 +52,10 @@ impl ConversationHistory {
|
||||
if !is_api_message(&item) {
|
||||
continue;
|
||||
}
|
||||
|
||||
self.validate_input(&item)?;
|
||||
self.items.push(item.clone());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_history(&mut self) -> Vec<ResponseItem> {
|
||||
@@ -81,6 +91,65 @@ impl ConversationHistory {
|
||||
self.items.clone()
|
||||
}
|
||||
|
||||
fn validate_input(&self, item: &ResponseItem) -> Result<(), CodexErr> {
|
||||
match item {
|
||||
ResponseItem::Message { content, .. } => {
|
||||
self.validate_input_content_item(content)?;
|
||||
Ok(())
|
||||
}
|
||||
ResponseItem::FunctionCall { .. }
|
||||
| ResponseItem::FunctionCallOutput { .. }
|
||||
| ResponseItem::CustomToolCall { .. }
|
||||
| ResponseItem::CustomToolCallOutput { .. }
|
||||
| ResponseItem::LocalShellCall { .. }
|
||||
| ResponseItem::Reasoning { .. }
|
||||
| ResponseItem::WebSearchCall { .. } => Ok(()),
|
||||
ResponseItem::Other => Err(CodexErr::InvalidInput(format!("invalid input: {item:?}"))),
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_input_content_item(
|
||||
&self,
|
||||
content: &[codex_protocol::models::ContentItem],
|
||||
) -> Result<(), CodexErr> {
|
||||
let Some(info) = &self.token_info else {
|
||||
return Ok(());
|
||||
};
|
||||
// this will intentionally not check the context for the first turn before getting this information.
|
||||
// it's acceptable tradeoff.
|
||||
let Some(context_window) = info.model_context_window else {
|
||||
return Ok(());
|
||||
};
|
||||
let tokenizer = match shared_tokenizer() {
|
||||
Some(t) => t,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
let mut input_tokens: i64 = 0;
|
||||
for item in content {
|
||||
match item {
|
||||
codex_protocol::models::ContentItem::InputText { text } => {
|
||||
input_tokens += tokenizer.count(text);
|
||||
}
|
||||
codex_protocol::models::ContentItem::InputImage { .. } => {
|
||||
// no validation currently
|
||||
}
|
||||
codex_protocol::models::ContentItem::OutputText { .. } => {
|
||||
// no validation currently
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let prior_total = info.last_token_usage.total_tokens;
|
||||
let combined_tokens = prior_total.saturating_add(input_tokens);
|
||||
let threshold = context_window * 95 / 100;
|
||||
if combined_tokens > threshold {
|
||||
return Err(CodexErr::InvalidInput("input too large".to_string()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_call_outputs_present(&mut self) {
|
||||
// Collect synthetic outputs to insert immediately after their calls.
|
||||
// Store the insertion position (index of call) alongside the item so
|
||||
@@ -343,6 +412,36 @@ fn error_or_panic(message: String) {
|
||||
}
|
||||
}
|
||||
|
||||
fn shared_tokenizer() -> Option<Arc<Tokenizer>> {
|
||||
TOKENIZER.get().and_then(|opt| opt.as_ref().map(Arc::clone))
|
||||
}
|
||||
|
||||
/// Kick off background initialization of the shared tokenizer without blocking the caller.
|
||||
pub(crate) fn prefetch_tokenizer_in_background() {
|
||||
if TOKENIZER.get().is_some() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Spawn a background task to initialize the tokenizer. Use spawn_blocking in case
|
||||
// initialization performs CPU-heavy work or file I/O.
|
||||
tokio::spawn(async {
|
||||
let result = task::spawn_blocking(Tokenizer::try_default).await;
|
||||
match result {
|
||||
Ok(Ok(tokenizer)) => {
|
||||
let _ = TOKENIZER.set(Some(Arc::new(tokenizer)));
|
||||
}
|
||||
Ok(Err(error)) => {
|
||||
error!("failed to create tokenizer: {error}");
|
||||
let _ = TOKENIZER.set(None);
|
||||
}
|
||||
Err(join_error) => {
|
||||
error!("failed to join tokenizer init task: {join_error}");
|
||||
let _ = TOKENIZER.set(None);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Anything that is not a system message or "reasoning" message is considered
|
||||
/// an API message.
|
||||
fn is_api_message(message: &ResponseItem) -> bool {
|
||||
@@ -381,7 +480,7 @@ mod tests {
|
||||
|
||||
fn create_history_with_items(items: Vec<ResponseItem>) -> ConversationHistory {
|
||||
let mut h = ConversationHistory::new();
|
||||
h.record_items(items.iter());
|
||||
h.record_items(items.iter()).unwrap();
|
||||
h
|
||||
}
|
||||
|
||||
@@ -397,7 +496,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn filters_non_api_messages() {
|
||||
let mut h = ConversationHistory::default();
|
||||
let mut h = ConversationHistory::new();
|
||||
// System message is not an API message; Other is ignored.
|
||||
let system = ResponseItem::Message {
|
||||
id: None,
|
||||
@@ -406,12 +505,12 @@ mod tests {
|
||||
text: "ignored".to_string(),
|
||||
}],
|
||||
};
|
||||
h.record_items([&system, &ResponseItem::Other]);
|
||||
h.record_items([&system, &ResponseItem::Other]).unwrap();
|
||||
|
||||
// User and assistant should be retained.
|
||||
let u = user_msg("hi");
|
||||
let a = assistant_msg("hello");
|
||||
h.record_items([&u, &a]);
|
||||
h.record_items([&u, &a]).unwrap();
|
||||
|
||||
let items = h.contents();
|
||||
assert_eq!(
|
||||
|
||||
@@ -158,6 +158,9 @@ pub enum CodexErr {
|
||||
|
||||
#[error("{0}")]
|
||||
EnvVar(EnvVarError),
|
||||
|
||||
#[error("invalid input: {0}")]
|
||||
InvalidInput(String),
|
||||
}
|
||||
|
||||
impl From<CancelErr> for CodexErr {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::codex::Session;
|
||||
use crate::conversation_history::ConversationHistory;
|
||||
use crate::error::Result as CodexResult;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -13,7 +14,7 @@ pub(crate) async fn process_items(
|
||||
is_review_mode: bool,
|
||||
review_thread_history: &mut ConversationHistory,
|
||||
sess: &Session,
|
||||
) -> (Vec<ResponseInputItem>, Vec<ResponseItem>) {
|
||||
) -> CodexResult<(Vec<ResponseInputItem>, Vec<ResponseItem>)> {
|
||||
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
|
||||
let mut responses = Vec::<ResponseInputItem>::new();
|
||||
for processed_response_item in processed_items {
|
||||
@@ -102,11 +103,11 @@ pub(crate) async fn process_items(
|
||||
// Only attempt to take the lock if there is something to record.
|
||||
if !items_to_record_in_conversation_history.is_empty() {
|
||||
if is_review_mode {
|
||||
review_thread_history.record_items(items_to_record_in_conversation_history.iter());
|
||||
review_thread_history.record_items(items_to_record_in_conversation_history.iter())?;
|
||||
} else {
|
||||
sess.record_conversation_items(&items_to_record_in_conversation_history)
|
||||
.await;
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
(responses, items_to_record_in_conversation_history)
|
||||
Ok((responses, items_to_record_in_conversation_history))
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ use codex_protocol::models::ResponseItem;
|
||||
|
||||
use crate::codex::SessionConfiguration;
|
||||
use crate::conversation_history::ConversationHistory;
|
||||
use crate::error::CodexErr;
|
||||
use crate::protocol::RateLimitSnapshot;
|
||||
use crate::protocol::TokenUsage;
|
||||
use crate::protocol::TokenUsageInfo;
|
||||
@@ -26,12 +27,13 @@ impl SessionState {
|
||||
}
|
||||
|
||||
// History helpers
|
||||
pub(crate) fn record_items<I>(&mut self, items: I)
|
||||
pub(crate) fn record_items<I>(&mut self, items: I) -> Result<(), CodexErr>
|
||||
where
|
||||
I: IntoIterator,
|
||||
I::Item: std::ops::Deref<Target = ResponseItem>,
|
||||
{
|
||||
self.history.record_items(items)
|
||||
self.history.record_items(items)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn history_snapshot(&mut self) -> Vec<ResponseItem> {
|
||||
@@ -66,7 +68,14 @@ impl SessionState {
|
||||
pub(crate) fn token_info_and_rate_limits(
|
||||
&self,
|
||||
) -> (Option<TokenUsageInfo>, Option<RateLimitSnapshot>) {
|
||||
(self.token_info(), self.latest_rate_limits.clone())
|
||||
let info = self.token_info().and_then(|info| {
|
||||
if info.total_token_usage.is_zero() && info.last_token_usage.is_zero() {
|
||||
None
|
||||
} else {
|
||||
Some(info)
|
||||
}
|
||||
});
|
||||
(info, self.latest_rate_limits.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn set_token_usage_full(&mut self, context_window: i64) {
|
||||
|
||||
@@ -5,6 +5,7 @@ use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex::compact;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::state::TaskKind;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
|
||||
@@ -26,7 +27,7 @@ impl SessionTask for CompactTask {
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
_cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> CodexResult<Option<String>> {
|
||||
compact::run_compact_task(session.clone_session(), ctx, input).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,8 @@ use tracing::warn;
|
||||
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::protocol::ErrorEvent;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::TaskCompleteEvent;
|
||||
use crate::protocol::TurnAbortReason;
|
||||
@@ -56,7 +58,7 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String>;
|
||||
) -> CodexResult<Option<String>>;
|
||||
|
||||
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
|
||||
let _ = (session, ctx);
|
||||
@@ -86,7 +88,7 @@ impl Session {
|
||||
let task_cancellation_token = cancellation_token.child_token();
|
||||
tokio::spawn(async move {
|
||||
let ctx_for_finish = Arc::clone(&ctx);
|
||||
let last_agent_message = task_for_run
|
||||
let run_result = task_for_run
|
||||
.run(
|
||||
Arc::clone(&session_ctx),
|
||||
ctx,
|
||||
@@ -98,8 +100,21 @@ impl Session {
|
||||
if !task_cancellation_token.is_cancelled() {
|
||||
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
|
||||
let sess = session_ctx.clone_session();
|
||||
sess.on_task_finished(ctx_for_finish, last_agent_message)
|
||||
.await;
|
||||
match run_result {
|
||||
Ok(last_agent_message) => {
|
||||
sess.on_task_finished(ctx_for_finish, last_agent_message)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let message = err.to_string();
|
||||
sess.send_event(
|
||||
ctx_for_finish.as_ref(),
|
||||
EventMsg::Error(ErrorEvent { message }),
|
||||
)
|
||||
.await;
|
||||
sess.on_task_finished(ctx_for_finish, None).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
done_clone.notify_waiters();
|
||||
})
|
||||
|
||||
@@ -5,6 +5,7 @@ use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex::run_task;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::state::TaskKind;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
|
||||
@@ -26,7 +27,7 @@ impl SessionTask for RegularTask {
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> CodexResult<Option<String>> {
|
||||
let sess = session.clone_session();
|
||||
run_task(sess, ctx, input, TaskKind::Regular, cancellation_token).await
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex::exit_review_mode;
|
||||
use crate::codex::run_task;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::state::TaskKind;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
|
||||
@@ -27,12 +28,12 @@ impl SessionTask for ReviewTask {
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> CodexResult<Option<String>> {
|
||||
let sess = session.clone_session();
|
||||
run_task(sess, ctx, input, TaskKind::Review, cancellation_token).await
|
||||
}
|
||||
|
||||
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
|
||||
exit_review_mode(session.clone_session(), ctx, None).await;
|
||||
let _ = exit_review_mode(session.clone_session(), ctx, None).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -279,11 +279,6 @@ async fn auto_compact_runs_after_token_limit_hit() {
|
||||
ev_completed_with_tokens("r2", 330_000),
|
||||
]);
|
||||
|
||||
let sse3 = sse(vec![
|
||||
ev_assistant_message("m3", AUTO_SUMMARY_TEXT),
|
||||
ev_completed_with_tokens("r3", 200),
|
||||
]);
|
||||
|
||||
let first_matcher = |req: &wiremock::Request| {
|
||||
let body = std::str::from_utf8(&req.body).unwrap_or("");
|
||||
body.contains(FIRST_AUTO_MSG)
|
||||
@@ -300,12 +295,6 @@ async fn auto_compact_runs_after_token_limit_hit() {
|
||||
};
|
||||
mount_sse_once_match(&server, second_matcher, sse2).await;
|
||||
|
||||
let third_matcher = |req: &wiremock::Request| {
|
||||
let body = std::str::from_utf8(&req.body).unwrap_or("");
|
||||
body.contains("You have exceeded the maximum number of tokens")
|
||||
};
|
||||
mount_sse_once_match(&server, third_matcher, sse3).await;
|
||||
|
||||
let model_provider = ModelProviderInfo {
|
||||
base_url: Some(format!("{}/v1", server.uri())),
|
||||
..built_in_model_providers()["openai"].clone()
|
||||
@@ -342,69 +331,28 @@ async fn auto_compact_runs_after_token_limit_hit() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let error_event = wait_for_event(&codex, |ev| matches!(ev, EventMsg::Error(_))).await;
|
||||
let EventMsg::Error(error_event) = error_event else {
|
||||
unreachable!("wait_for_event returned unexpected payload");
|
||||
};
|
||||
assert_eq!(error_event.message, "invalid input: input too large");
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
|
||||
// wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
|
||||
|
||||
let requests = server.received_requests().await.unwrap();
|
||||
assert!(
|
||||
requests.len() >= 3,
|
||||
"auto compact should add at least a third request, got {}",
|
||||
requests.len()
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
2,
|
||||
"auto compact should reject oversize prompts before issuing another request"
|
||||
);
|
||||
let is_auto_compact = |req: &wiremock::Request| {
|
||||
let saw_compact_prompt = requests.iter().any(|req| {
|
||||
std::str::from_utf8(&req.body)
|
||||
.unwrap_or("")
|
||||
.contains("You have exceeded the maximum number of tokens")
|
||||
};
|
||||
let auto_compact_count = requests.iter().filter(|req| is_auto_compact(req)).count();
|
||||
assert_eq!(
|
||||
auto_compact_count, 1,
|
||||
"expected exactly one auto compact request"
|
||||
);
|
||||
let auto_compact_index = requests
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find_map(|(idx, req)| is_auto_compact(req).then_some(idx))
|
||||
.expect("auto compact request missing");
|
||||
assert_eq!(
|
||||
auto_compact_index, 2,
|
||||
"auto compact should add a third request"
|
||||
);
|
||||
|
||||
let body_first = requests[0].body_json::<serde_json::Value>().unwrap();
|
||||
let body3 = requests[auto_compact_index]
|
||||
.body_json::<serde_json::Value>()
|
||||
.unwrap();
|
||||
let instructions = body3
|
||||
.get("instructions")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or_default();
|
||||
let baseline_instructions = body_first
|
||||
.get("instructions")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
assert_eq!(
|
||||
instructions, baseline_instructions,
|
||||
"auto compact should keep the standard developer instructions",
|
||||
);
|
||||
|
||||
let input3 = body3.get("input").and_then(|v| v.as_array()).unwrap();
|
||||
let last3 = input3
|
||||
.last()
|
||||
.expect("auto compact request should append a user message");
|
||||
assert_eq!(last3.get("type").and_then(|v| v.as_str()), Some("message"));
|
||||
assert_eq!(last3.get("role").and_then(|v| v.as_str()), Some("user"));
|
||||
let last_text = last3
|
||||
.get("content")
|
||||
.and_then(|v| v.as_array())
|
||||
.and_then(|items| items.first())
|
||||
.and_then(|item| item.get("text"))
|
||||
.and_then(|text| text.as_str())
|
||||
.unwrap_or_default();
|
||||
assert_eq!(
|
||||
last_text, SUMMARIZATION_PROMPT,
|
||||
"auto compact should send the summarization prompt as a user message",
|
||||
});
|
||||
assert!(
|
||||
!saw_compact_prompt,
|
||||
"no auto compact request should be sent when the summarization prompt exceeds the limit"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -869,7 +817,7 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let context_window = 100;
|
||||
let context_window = 20_000;
|
||||
let limit = context_window * 90 / 100;
|
||||
let over_limit_tokens = context_window * 95 / 100 + 1;
|
||||
|
||||
|
||||
69
codex-rs/core/tests/suite/input_validation.rs
Normal file
69
codex-rs/core/tests/suite/input_validation.rs
Normal file
@@ -0,0 +1,69 @@
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event_with_timeout;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use wiremock::matchers::any;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn input_validation_should_fail_for_too_large_input() {
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let fixture = test_codex().build(&server).await.unwrap();
|
||||
let codex = Arc::clone(&fixture.codex);
|
||||
|
||||
// First: normal message with a mocked assistant response
|
||||
let first_response = sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "ok"),
|
||||
ev_completed("resp-1"),
|
||||
]);
|
||||
responses::mount_sse_once_match(&server, any(), first_response).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "hello world".into(),
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Wait for the normal turn to complete before sending the oversized input
|
||||
let turn_timeout = Duration::from_secs(1);
|
||||
wait_for_event_with_timeout(
|
||||
&codex,
|
||||
|ev| matches!(ev, EventMsg::TaskComplete(_)),
|
||||
turn_timeout,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Then: 300k-token message should trigger validation error
|
||||
let wait_timeout = Duration::from_millis(100);
|
||||
let input_300_tokens = "token ".repeat(300_000);
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: input_300_tokens,
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let error_event =
|
||||
wait_for_event_with_timeout(&codex, |ev| matches!(ev, EventMsg::Error(_)), wait_timeout)
|
||||
.await;
|
||||
let EventMsg::Error(error_event) = error_event else {
|
||||
unreachable!("wait_for_event_with_timeout returned unexpected payload");
|
||||
};
|
||||
assert_eq!(error_event.message, "invalid input: input too large");
|
||||
}
|
||||
@@ -13,6 +13,7 @@ mod compact_resume_fork;
|
||||
mod exec;
|
||||
mod fork_conversation;
|
||||
mod grep_files;
|
||||
mod input_validation;
|
||||
mod items;
|
||||
mod json_result;
|
||||
mod list_dir;
|
||||
|
||||
@@ -575,9 +575,11 @@ pub struct TokenUsage {
|
||||
pub total_tokens: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS, Default)]
|
||||
pub struct TokenUsageInfo {
|
||||
/// The total token usage for the session. accumulated from all turns.
|
||||
pub total_token_usage: TokenUsage,
|
||||
/// The token usage for the last turn. Received from the API. It's total tokens is the whole window size.
|
||||
pub last_token_usage: TokenUsage,
|
||||
#[ts(type = "number | null")]
|
||||
pub model_context_window: Option<i64>,
|
||||
|
||||
@@ -107,6 +107,12 @@ impl Tokenizer {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Tokenizer {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Tokenizer").finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
Reference in New Issue
Block a user