Compare commits

...

7 Commits

Author SHA1 Message Date
Ahmed Ibrahim
aa992daf3c fix 2026-02-08 21:55:43 -08:00
Ahmed Ibrahim
1bef42a91b tests 2026-02-08 21:37:26 -08:00
Ahmed Ibrahim
316637aa34 fix 2026-02-08 21:37:07 -08:00
Ahmed Ibrahim
0fd69f1b37 Extract ContextDiscoverable history handling 2026-02-08 20:02:58 -08:00
Ahmed Ibrahim
ed4dd2b10e fix 2026-02-08 19:02:22 -08:00
Ahmed Ibrahim
508e9ebe32 Update app-server protocol schema fixtures for ResponseItem refactor 2026-02-08 18:27:46 -08:00
Ahmed Ibrahim
2e267dec61 Refactor ResponseItem to typed variant payload structs 2026-02-08 18:18:51 -08:00
61 changed files with 1729 additions and 951 deletions

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type Compaction = { encrypted_content: string, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type CustomToolCall = { status?: string, call_id: string, name: string, input: string, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type CustomToolCallOutput = { call_id: string, output: string, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type FunctionCall = { name: string, arguments: string, call_id: string, };

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { FunctionCallOutputPayload } from "./FunctionCallOutputPayload";
export type FunctionCallOutput = { call_id: string, output: FunctionCallOutputPayload, };

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { GhostCommit } from "./GhostCommit";
export type GhostSnapshot = { ghost_commit: GhostCommit, };

View File

@@ -0,0 +1,11 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { LocalShellAction } from "./LocalShellAction";
import type { LocalShellStatus } from "./LocalShellStatus";
export type LocalShellCall = {
/**
* Set when using the Responses API.
*/
call_id: string | null, status: LocalShellStatus, action: LocalShellAction, };

View File

@@ -0,0 +1,7 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ContentItem } from "./ContentItem";
import type { MessagePhase } from "./MessagePhase";
export type Message = { role: string, content: Array<ContentItem>, end_turn?: boolean, phase?: MessagePhase, };

View File

@@ -0,0 +1,7 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ReasoningItemContent } from "./ReasoningItemContent";
import type { ReasoningItemReasoningSummary } from "./ReasoningItemReasoningSummary";
export type Reasoning = { summary: Array<ReasoningItemReasoningSummary>, content?: Array<ReasoningItemContent>, encrypted_content: string | null, };

View File

@@ -1,18 +1,15 @@
// GENERATED CODE! DO NOT MODIFY BY HAND! // GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ContentItem } from "./ContentItem"; import type { Compaction } from "./Compaction";
import type { FunctionCallOutputPayload } from "./FunctionCallOutputPayload"; import type { CustomToolCall } from "./CustomToolCall";
import type { GhostCommit } from "./GhostCommit"; import type { CustomToolCallOutput } from "./CustomToolCallOutput";
import type { LocalShellAction } from "./LocalShellAction"; import type { FunctionCall } from "./FunctionCall";
import type { LocalShellStatus } from "./LocalShellStatus"; import type { FunctionCallOutput } from "./FunctionCallOutput";
import type { MessagePhase } from "./MessagePhase"; import type { GhostSnapshot } from "./GhostSnapshot";
import type { ReasoningItemContent } from "./ReasoningItemContent"; import type { LocalShellCall } from "./LocalShellCall";
import type { ReasoningItemReasoningSummary } from "./ReasoningItemReasoningSummary"; import type { Message } from "./Message";
import type { WebSearchAction } from "./WebSearchAction"; import type { Reasoning } from "./Reasoning";
import type { WebSearchCall } from "./WebSearchCall";
export type ResponseItem = { "type": "message", role: string, content: Array<ContentItem>, end_turn?: boolean, phase?: MessagePhase, } | { "type": "reasoning", summary: Array<ReasoningItemReasoningSummary>, content?: Array<ReasoningItemContent>, encrypted_content: string | null, } | { "type": "local_shell_call", export type ResponseItem = { "type": "message" } & Message | { "type": "reasoning" } & Reasoning | { "type": "local_shell_call" } & LocalShellCall | { "type": "function_call" } & FunctionCall | { "type": "function_call_output" } & FunctionCallOutput | { "type": "custom_tool_call" } & CustomToolCall | { "type": "custom_tool_call_output" } & CustomToolCallOutput | { "type": "web_search_call" } & WebSearchCall | { "type": "ghost_snapshot" } & GhostSnapshot | { "type": "compaction" } & Compaction | { "type": "other" };
/**
* Set when using the Responses API.
*/
call_id: string | null, status: LocalShellStatus, action: LocalShellAction, } | { "type": "function_call", name: string, arguments: string, call_id: string, } | { "type": "function_call_output", call_id: string, output: FunctionCallOutputPayload, } | { "type": "custom_tool_call", status?: string, call_id: string, name: string, input: string, } | { "type": "custom_tool_call_output", call_id: string, output: string, } | { "type": "web_search_call", status?: string, action?: WebSearchAction, } | { "type": "ghost_snapshot", ghost_commit: GhostCommit, } | { "type": "compaction", encrypted_content: string, } | { "type": "other" };

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { WebSearchAction } from "./WebSearchAction";
export type WebSearchCall = { status?: string, action?: WebSearchAction, };

View File

@@ -43,6 +43,7 @@ export type { CollabWaitingBeginEvent } from "./CollabWaitingBeginEvent";
export type { CollabWaitingEndEvent } from "./CollabWaitingEndEvent"; export type { CollabWaitingEndEvent } from "./CollabWaitingEndEvent";
export type { CollaborationMode } from "./CollaborationMode"; export type { CollaborationMode } from "./CollaborationMode";
export type { CollaborationModeMask } from "./CollaborationModeMask"; export type { CollaborationModeMask } from "./CollaborationModeMask";
export type { Compaction } from "./Compaction";
export type { ContentItem } from "./ContentItem"; export type { ContentItem } from "./ContentItem";
export type { ContextCompactedEvent } from "./ContextCompactedEvent"; export type { ContextCompactedEvent } from "./ContextCompactedEvent";
export type { ContextCompactionItem } from "./ContextCompactionItem"; export type { ContextCompactionItem } from "./ContextCompactionItem";
@@ -50,6 +51,8 @@ export type { ConversationGitInfo } from "./ConversationGitInfo";
export type { ConversationSummary } from "./ConversationSummary"; export type { ConversationSummary } from "./ConversationSummary";
export type { CreditsSnapshot } from "./CreditsSnapshot"; export type { CreditsSnapshot } from "./CreditsSnapshot";
export type { CustomPrompt } from "./CustomPrompt"; export type { CustomPrompt } from "./CustomPrompt";
export type { CustomToolCall } from "./CustomToolCall";
export type { CustomToolCallOutput } from "./CustomToolCallOutput";
export type { DeprecationNoticeEvent } from "./DeprecationNoticeEvent"; export type { DeprecationNoticeEvent } from "./DeprecationNoticeEvent";
export type { DynamicToolCallRequest } from "./DynamicToolCallRequest"; export type { DynamicToolCallRequest } from "./DynamicToolCallRequest";
export type { ElicitationRequestEvent } from "./ElicitationRequestEvent"; export type { ElicitationRequestEvent } from "./ElicitationRequestEvent";
@@ -71,6 +74,8 @@ export type { FileChange } from "./FileChange";
export type { ForcedLoginMethod } from "./ForcedLoginMethod"; export type { ForcedLoginMethod } from "./ForcedLoginMethod";
export type { ForkConversationParams } from "./ForkConversationParams"; export type { ForkConversationParams } from "./ForkConversationParams";
export type { ForkConversationResponse } from "./ForkConversationResponse"; export type { ForkConversationResponse } from "./ForkConversationResponse";
export type { FunctionCall } from "./FunctionCall";
export type { FunctionCallOutput } from "./FunctionCallOutput";
export type { FunctionCallOutputBody } from "./FunctionCallOutputBody"; export type { FunctionCallOutputBody } from "./FunctionCallOutputBody";
export type { FunctionCallOutputContentItem } from "./FunctionCallOutputContentItem"; export type { FunctionCallOutputContentItem } from "./FunctionCallOutputContentItem";
export type { FunctionCallOutputPayload } from "./FunctionCallOutputPayload"; export type { FunctionCallOutputPayload } from "./FunctionCallOutputPayload";
@@ -85,6 +90,7 @@ export type { GetHistoryEntryResponseEvent } from "./GetHistoryEntryResponseEven
export type { GetUserAgentResponse } from "./GetUserAgentResponse"; export type { GetUserAgentResponse } from "./GetUserAgentResponse";
export type { GetUserSavedConfigResponse } from "./GetUserSavedConfigResponse"; export type { GetUserSavedConfigResponse } from "./GetUserSavedConfigResponse";
export type { GhostCommit } from "./GhostCommit"; export type { GhostCommit } from "./GhostCommit";
export type { GhostSnapshot } from "./GhostSnapshot";
export type { GitDiffToRemoteParams } from "./GitDiffToRemoteParams"; export type { GitDiffToRemoteParams } from "./GitDiffToRemoteParams";
export type { GitDiffToRemoteResponse } from "./GitDiffToRemoteResponse"; export type { GitDiffToRemoteResponse } from "./GitDiffToRemoteResponse";
export type { GitSha } from "./GitSha"; export type { GitSha } from "./GitSha";
@@ -104,6 +110,7 @@ export type { ListCustomPromptsResponseEvent } from "./ListCustomPromptsResponse
export type { ListRemoteSkillsResponseEvent } from "./ListRemoteSkillsResponseEvent"; export type { ListRemoteSkillsResponseEvent } from "./ListRemoteSkillsResponseEvent";
export type { ListSkillsResponseEvent } from "./ListSkillsResponseEvent"; export type { ListSkillsResponseEvent } from "./ListSkillsResponseEvent";
export type { LocalShellAction } from "./LocalShellAction"; export type { LocalShellAction } from "./LocalShellAction";
export type { LocalShellCall } from "./LocalShellCall";
export type { LocalShellExecAction } from "./LocalShellExecAction"; export type { LocalShellExecAction } from "./LocalShellExecAction";
export type { LocalShellStatus } from "./LocalShellStatus"; export type { LocalShellStatus } from "./LocalShellStatus";
export type { LoginApiKeyParams } from "./LoginApiKeyParams"; export type { LoginApiKeyParams } from "./LoginApiKeyParams";
@@ -120,6 +127,7 @@ export type { McpStartupStatus } from "./McpStartupStatus";
export type { McpStartupUpdateEvent } from "./McpStartupUpdateEvent"; export type { McpStartupUpdateEvent } from "./McpStartupUpdateEvent";
export type { McpToolCallBeginEvent } from "./McpToolCallBeginEvent"; export type { McpToolCallBeginEvent } from "./McpToolCallBeginEvent";
export type { McpToolCallEndEvent } from "./McpToolCallEndEvent"; export type { McpToolCallEndEvent } from "./McpToolCallEndEvent";
export type { Message } from "./Message";
export type { MessagePhase } from "./MessagePhase"; export type { MessagePhase } from "./MessagePhase";
export type { ModeKind } from "./ModeKind"; export type { ModeKind } from "./ModeKind";
export type { NetworkAccess } from "./NetworkAccess"; export type { NetworkAccess } from "./NetworkAccess";
@@ -137,6 +145,7 @@ export type { Profile } from "./Profile";
export type { RateLimitSnapshot } from "./RateLimitSnapshot"; export type { RateLimitSnapshot } from "./RateLimitSnapshot";
export type { RateLimitWindow } from "./RateLimitWindow"; export type { RateLimitWindow } from "./RateLimitWindow";
export type { RawResponseItemEvent } from "./RawResponseItemEvent"; export type { RawResponseItemEvent } from "./RawResponseItemEvent";
export type { Reasoning } from "./Reasoning";
export type { ReasoningContentDeltaEvent } from "./ReasoningContentDeltaEvent"; export type { ReasoningContentDeltaEvent } from "./ReasoningContentDeltaEvent";
export type { ReasoningEffort } from "./ReasoningEffort"; export type { ReasoningEffort } from "./ReasoningEffort";
export type { ReasoningItem } from "./ReasoningItem"; export type { ReasoningItem } from "./ReasoningItem";
@@ -218,6 +227,7 @@ export type { ViewImageToolCallEvent } from "./ViewImageToolCallEvent";
export type { WarningEvent } from "./WarningEvent"; export type { WarningEvent } from "./WarningEvent";
export type { WebSearchAction } from "./WebSearchAction"; export type { WebSearchAction } from "./WebSearchAction";
export type { WebSearchBeginEvent } from "./WebSearchBeginEvent"; export type { WebSearchBeginEvent } from "./WebSearchBeginEvent";
export type { WebSearchCall } from "./WebSearchCall";
export type { WebSearchEndEvent } from "./WebSearchEndEvent"; export type { WebSearchEndEvent } from "./WebSearchEndEvent";
export type { WebSearchItem } from "./WebSearchItem"; export type { WebSearchItem } from "./WebSearchItem";
export type { WebSearchMode } from "./WebSearchMode"; export type { WebSearchMode } from "./WebSearchMode";

View File

@@ -301,7 +301,7 @@ async fn test_list_and_resume_conversations() -> Result<()> {
// Resuming with explicit history should succeed even without a stored rollout. // Resuming with explicit history should succeed even without a stored rollout.
let fork_history_text = "Hello from history"; let fork_history_text = "Hello from history";
let history = vec![ResponseItem::Message { let history = vec![ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -309,7 +309,7 @@ async fn test_list_and_resume_conversations() -> Result<()> {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}]; })];
let resume_with_history_req_id = mcp let resume_with_history_req_id = mcp
.send_resume_conversation_request(ResumeConversationParams { .send_resume_conversation_request(ResumeConversationParams {
path: None, path: None,

View File

@@ -445,7 +445,10 @@ async fn read_raw_response_item(mcp: &mut McpProcess, conversation_id: ThreadId)
// Ghost snapshots are produced concurrently and may arrive before the model reply. // Ghost snapshots are produced concurrently and may arrive before the model reply.
let event: RawResponseItemEvent = let event: RawResponseItemEvent =
serde_json::from_value(msg_value).expect("deserialize raw response item"); serde_json::from_value(msg_value).expect("deserialize raw response item");
if !matches!(event.item, ResponseItem::GhostSnapshot { .. }) { if !matches!(
event.item,
ResponseItem::GhostSnapshot(codex_protocol::models::GhostSnapshot { .. })
) {
return event.item; return event.item;
} }
} }
@@ -453,7 +456,7 @@ async fn read_raw_response_item(mcp: &mut McpProcess, conversation_id: ThreadId)
fn assert_instructions_message(item: &ResponseItem) { fn assert_instructions_message(item: &ResponseItem) {
match item { match item {
ResponseItem::Message { role, content, .. } => { ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) => {
assert_eq!(role, "user"); assert_eq!(role, "user");
let texts = content_texts(content); let texts = content_texts(content);
let is_instructions = texts let is_instructions = texts
@@ -470,7 +473,7 @@ fn assert_instructions_message(item: &ResponseItem) {
fn assert_permissions_message(item: &ResponseItem) { fn assert_permissions_message(item: &ResponseItem) {
match item { match item {
ResponseItem::Message { role, content, .. } => { ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) => {
assert_eq!(role, "developer"); assert_eq!(role, "developer");
let texts = content_texts(content); let texts = content_texts(content);
let expected = DeveloperInstructions::from_policy( let expected = DeveloperInstructions::from_policy(
@@ -493,7 +496,7 @@ fn assert_permissions_message(item: &ResponseItem) {
fn assert_developer_message(item: &ResponseItem, expected_text: &str) { fn assert_developer_message(item: &ResponseItem, expected_text: &str) {
match item { match item {
ResponseItem::Message { role, content, .. } => { ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) => {
assert_eq!(role, "developer"); assert_eq!(role, "developer");
let texts = content_texts(content); let texts = content_texts(content);
assert_eq!( assert_eq!(
@@ -508,7 +511,7 @@ fn assert_developer_message(item: &ResponseItem, expected_text: &str) {
fn assert_environment_message(item: &ResponseItem) { fn assert_environment_message(item: &ResponseItem) {
match item { match item {
ResponseItem::Message { role, content, .. } => { ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) => {
assert_eq!(role, "user"); assert_eq!(role, "user");
let texts = content_texts(content); let texts = content_texts(content);
assert!( assert!(
@@ -524,7 +527,7 @@ fn assert_environment_message(item: &ResponseItem) {
fn assert_user_message(item: &ResponseItem, expected_text: &str) { fn assert_user_message(item: &ResponseItem, expected_text: &str) {
match item { match item {
ResponseItem::Message { role, content, .. } => { ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) => {
assert_eq!(role, "user"); assert_eq!(role, "user");
let texts = content_texts(content); let texts = content_texts(content);
assert_eq!(texts, vec![expected_text]); assert_eq!(texts, vec![expected_text]);
@@ -535,7 +538,7 @@ fn assert_user_message(item: &ResponseItem, expected_text: &str) {
fn assert_assistant_message(item: &ResponseItem, expected_text: &str) { fn assert_assistant_message(item: &ResponseItem, expected_text: &str) {
match item { match item {
ResponseItem::Message { role, content, .. } => { ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) => {
assert_eq!(role, "assistant"); assert_eq!(role, "assistant");
let texts = content_texts(content); let texts = content_texts(content);
assert_eq!(texts, vec![expected_text]); assert_eq!(texts, vec![expected_text]);

View File

@@ -123,7 +123,7 @@ async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()
let responses_log = responses::mount_sse_sequence(&server, vec![sse1, sse2, sse3]).await; let responses_log = responses::mount_sse_sequence(&server, vec![sse1, sse2, sse3]).await;
let compacted_history = vec![ let compacted_history = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -131,10 +131,10 @@ async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Compaction { ResponseItem::Compaction(codex_protocol::models::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
}, }),
]; ];
let compact_mock = responses::mount_compact_json_once( let compact_mock = responses::mount_compact_json_once(
&server, &server,

View File

@@ -392,7 +392,7 @@ async fn thread_resume_supports_history_and_overrides() -> Result<()> {
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?; let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let history_text = "Hello from history"; let history_text = "Hello from history";
let history = vec![ResponseItem::Message { let history = vec![ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -400,7 +400,7 @@ async fn thread_resume_supports_history_and_overrides() -> Result<()> {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}]; })];
// Resume with explicit history and override the model. // Resume with explicit history and override the model.
let resume_id = mcp let resume_id = mcp

View File

@@ -36,12 +36,15 @@ impl Stream for AggregatedStream {
Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item)))) => { Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item)))) => {
let is_assistant_message = matches!( let is_assistant_message = matches!(
&item, &item,
ResponseItem::Message { role, .. } if role == "assistant" ResponseItem::Message(codex_protocol::models::Message { role, .. }) if role == "assistant"
); );
if is_assistant_message { if is_assistant_message {
if this.cumulative.is_empty() if this.cumulative.is_empty()
&& let ResponseItem::Message { content, .. } = &item && let ResponseItem::Message(codex_protocol::models::Message {
content,
..
}) = &item
&& let Some(text) = content.iter().find_map(|c| match c { && let Some(text) = content.iter().find_map(|c| match c {
ContentItem::OutputText { text } => Some(text), ContentItem::OutputText { text } => Some(text),
_ => None, _ => None,
@@ -70,29 +73,31 @@ impl Stream for AggregatedStream {
let mut emitted_any = false; let mut emitted_any = false;
if !this.cumulative_reasoning.is_empty() { if !this.cumulative_reasoning.is_empty() {
let aggregated_reasoning = ResponseItem::Reasoning { let aggregated_reasoning =
id: String::new(), ResponseItem::Reasoning(codex_protocol::models::Reasoning {
summary: Vec::new(), id: String::new(),
content: Some(vec![ReasoningItemContent::ReasoningText { summary: Vec::new(),
text: std::mem::take(&mut this.cumulative_reasoning), content: Some(vec![ReasoningItemContent::ReasoningText {
}]), text: std::mem::take(&mut this.cumulative_reasoning),
encrypted_content: None, }]),
}; encrypted_content: None,
});
this.pending this.pending
.push_back(ResponseEvent::OutputItemDone(aggregated_reasoning)); .push_back(ResponseEvent::OutputItemDone(aggregated_reasoning));
emitted_any = true; emitted_any = true;
} }
if !this.cumulative.is_empty() { if !this.cumulative.is_empty() {
let aggregated_message = ResponseItem::Message { let aggregated_message =
id: None, ResponseItem::Message(codex_protocol::models::Message {
role: "assistant".to_string(), id: None,
content: vec![ContentItem::OutputText { role: "assistant".to_string(),
text: std::mem::take(&mut this.cumulative), content: vec![ContentItem::OutputText {
}], text: std::mem::take(&mut this.cumulative),
end_turn: None, }],
phase: None, end_turn: None,
}; phase: None,
});
this.pending this.pending
.push_back(ResponseEvent::OutputItemDone(aggregated_message)); .push_back(ResponseEvent::OutputItemDone(aggregated_message));
emitted_any = true; emitted_any = true;

View File

@@ -169,12 +169,24 @@ fn attach_item_ids(payload_json: &mut Value, original_items: &[ResponseItem]) {
}; };
for (value, item) in items.iter_mut().zip(original_items.iter()) { for (value, item) in items.iter_mut().zip(original_items.iter()) {
if let ResponseItem::Reasoning { id, .. } if let ResponseItem::Reasoning(codex_protocol::models::Reasoning { id, .. })
| ResponseItem::Message { id: Some(id), .. } | ResponseItem::Message(codex_protocol::models::Message { id: Some(id), .. })
| ResponseItem::WebSearchCall { id: Some(id), .. } | ResponseItem::WebSearchCall(codex_protocol::models::WebSearchCall {
| ResponseItem::FunctionCall { id: Some(id), .. } id: Some(id),
| ResponseItem::LocalShellCall { id: Some(id), .. } ..
| ResponseItem::CustomToolCall { id: Some(id), .. } = item })
| ResponseItem::FunctionCall(codex_protocol::models::FunctionCall {
id: Some(id),
..
})
| ResponseItem::LocalShellCall(codex_protocol::models::LocalShellCall {
id: Some(id),
..
})
| ResponseItem::CustomToolCall(codex_protocol::models::CustomToolCall {
id: Some(id),
..
}) = item
{ {
if id.is_empty() { if id.is_empty() {
continue; continue;
@@ -217,20 +229,20 @@ mod tests {
fn azure_default_store_attaches_ids_and_headers() { fn azure_default_store_attaches_ids_and_headers() {
let provider = provider("azure", "https://example.openai.azure.com/v1"); let provider = provider("azure", "https://example.openai.azure.com/v1");
let input = vec![ let input = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: Some("m1".into()), id: Some("m1".into()),
role: "assistant".into(), role: "assistant".into(),
content: Vec::new(), content: Vec::new(),
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "assistant".into(), role: "assistant".into(),
content: Vec::new(), content: Vec::new(),
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
let request = ResponsesRequestBuilder::new("gpt-test", "inst", &input) let request = ResponsesRequestBuilder::new("gpt-test", "inst", &input)

View File

@@ -531,16 +531,16 @@ mod tests {
assert_matches!( assert_matches!(
&events[0], &events[0],
Ok(ResponseEvent::OutputItemDone(ResponseItem::Message { Ok(ResponseEvent::OutputItemDone(ResponseItem::Message(codex_protocol::models::Message {
role, role,
phase: Some(MessagePhase::Commentary), phase: Some(MessagePhase::Commentary),
.. ..
})) if role == "assistant" }))) if role == "assistant"
); );
assert_matches!( assert_matches!(
&events[1], &events[1],
Ok(ResponseEvent::OutputItemDone(ResponseItem::Message { role, .. })) Ok(ResponseEvent::OutputItemDone(ResponseItem::Message(codex_protocol::models::Message { role, .. })))
if role == "assistant" if role == "assistant"
); );

View File

@@ -255,7 +255,7 @@ async fn streaming_client_retries_on_transport_error() -> Result<()> {
let prompt = codex_api::Prompt { let prompt = codex_api::Prompt {
instructions: "Say hi".to_string(), instructions: "Say hi".to_string(),
input: vec![ResponseItem::Message { input: vec![ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -263,7 +263,7 @@ async fn streaming_client_retries_on_transport_error() -> Result<()> {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}], })],
tools: Vec::<Value>::new(), tools: Vec::<Value>::new(),
parallel_tool_calls: false, parallel_tool_calls: false,
output_schema: None, output_schema: None,

View File

@@ -144,14 +144,20 @@ async fn responses_stream_parses_items_and_completed_end_to_end() -> Result<()>
assert_eq!(events.len(), 3); assert_eq!(events.len(), 3);
match &events[0] { match &events[0] {
ResponseEvent::OutputItemDone(ResponseItem::Message { role, .. }) => { ResponseEvent::OutputItemDone(ResponseItem::Message(codex_protocol::models::Message {
role,
..
})) => {
assert_eq!(role, "assistant"); assert_eq!(role, "assistant");
} }
other => panic!("unexpected first event: {other:?}"), other => panic!("unexpected first event: {other:?}"),
} }
match &events[1] { match &events[1] {
ResponseEvent::OutputItemDone(ResponseItem::Message { role, .. }) => { ResponseEvent::OutputItemDone(ResponseItem::Message(codex_protocol::models::Message {
role,
..
})) => {
assert_eq!(role, "assistant"); assert_eq!(role, "assistant");
} }
other => panic!("unexpected second event: {other:?}"), other => panic!("unexpected second event: {other:?}"),
@@ -215,7 +221,10 @@ async fn responses_stream_aggregates_output_text_deltas() -> Result<()> {
assert_eq!(events.len(), 2); assert_eq!(events.len(), 2);
match &events[0] { match &events[0] {
ResponseEvent::OutputItemDone(ResponseItem::Message { content, .. }) => { ResponseEvent::OutputItemDone(ResponseItem::Message(codex_protocol::models::Message {
content,
..
})) => {
let mut aggregated = String::new(); let mut aggregated = String::new();
for item in content { for item in content {
if let ContentItem::OutputText { text } = item { if let ContentItem::OutputText { text } = item {

View File

@@ -68,35 +68,45 @@ fn reserialize_shell_outputs(items: &mut [ResponseItem]) {
let mut shell_call_ids: HashSet<String> = HashSet::new(); let mut shell_call_ids: HashSet<String> = HashSet::new();
items.iter_mut().for_each(|item| match item { items.iter_mut().for_each(|item| match item {
ResponseItem::LocalShellCall { call_id, id, .. } => { ResponseItem::LocalShellCall(codex_protocol::models::LocalShellCall {
call_id,
id,
..
}) => {
if let Some(identifier) = call_id.clone().or_else(|| id.clone()) { if let Some(identifier) = call_id.clone().or_else(|| id.clone()) {
shell_call_ids.insert(identifier); shell_call_ids.insert(identifier);
} }
} }
ResponseItem::CustomToolCall { ResponseItem::CustomToolCall(codex_protocol::models::CustomToolCall {
id: _, id: _,
status: _, status: _,
call_id, call_id,
name, name,
input: _, input: _,
} => { }) => {
if name == "apply_patch" { if name == "apply_patch" {
shell_call_ids.insert(call_id.clone()); shell_call_ids.insert(call_id.clone());
} }
} }
ResponseItem::CustomToolCallOutput { call_id, output } => { ResponseItem::CustomToolCallOutput(codex_protocol::models::CustomToolCallOutput {
call_id,
output,
}) => {
if shell_call_ids.remove(call_id) if shell_call_ids.remove(call_id)
&& let Some(structured) = parse_structured_shell_output(output) && let Some(structured) = parse_structured_shell_output(output)
{ {
*output = structured *output = structured
} }
} }
ResponseItem::FunctionCall { name, call_id, .. } ResponseItem::FunctionCall(codex_protocol::models::FunctionCall {
if is_shell_tool_name(name) || name == "apply_patch" => name, call_id, ..
{ }) if is_shell_tool_name(name) || name == "apply_patch" => {
shell_call_ids.insert(call_id.clone()); shell_call_ids.insert(call_id.clone());
} }
ResponseItem::FunctionCallOutput { call_id, output } => { ResponseItem::FunctionCallOutput(codex_protocol::models::FunctionCallOutput {
call_id,
output,
}) => {
if shell_call_ids.remove(call_id) if shell_call_ids.remove(call_id)
&& let Some(structured) = output && let Some(structured) = output
.text_content() .text_content()

View File

@@ -539,10 +539,7 @@ pub(crate) struct TurnContext {
} }
impl TurnContext { impl TurnContext {
pub(crate) fn model_context_window(&self) -> Option<i64> { pub(crate) fn model_context_window(&self) -> Option<i64> {
let effective_context_window_percent = self.model_info.effective_context_window_percent; effective_model_context_window(&self.model_info)
self.model_info.context_window.map(|context_window| {
context_window.saturating_mul(effective_context_window_percent) / 100
})
} }
pub(crate) fn resolve_path(&self, path: Option<String>) -> PathBuf { pub(crate) fn resolve_path(&self, path: Option<String>) -> PathBuf {
@@ -684,6 +681,13 @@ impl SessionConfiguration {
} }
} }
fn effective_model_context_window(model_info: &ModelInfo) -> Option<i64> {
let effective_context_window_percent = model_info.effective_context_window_percent;
model_info
.context_window
.map(|context_window| context_window.saturating_mul(effective_context_window_percent) / 100)
}
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub(crate) struct SessionSettingsUpdate { pub(crate) struct SessionSettingsUpdate {
pub(crate) cwd: Option<PathBuf>, pub(crate) cwd: Option<PathBuf>,
@@ -1044,7 +1048,15 @@ impl Session {
} }
}; };
session_configuration.thread_name = thread_name.clone(); session_configuration.thread_name = thread_name.clone();
let state = SessionState::new(session_configuration.clone()); let model_info = models_manager
.get_model_info(session_configuration.collaboration_mode.model(), &config)
.await;
let state = SessionState::new(
session_configuration.clone(),
conversation_id.to_string(),
config.codex_home.clone(),
effective_model_context_window(&model_info),
);
let services = SessionServices { let services = SessionServices {
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
@@ -2029,7 +2041,11 @@ impl Session {
turn_context: &TurnContext, turn_context: &TurnContext,
rollout_items: &[RolloutItem], rollout_items: &[RolloutItem],
) -> Vec<ResponseItem> { ) -> Vec<ResponseItem> {
let mut history = ContextManager::new(); let mut history = ContextManager::new(
Arc::new(self.conversation_id.to_string()),
Arc::new(turn_context.config.codex_home.clone()),
turn_context.model_context_window(),
);
for item in rollout_items { for item in rollout_items {
match item { match item {
RolloutItem::ResponseItem(response_item) => { RolloutItem::ResponseItem(response_item) => {
@@ -2083,7 +2099,7 @@ impl Session {
self.services self.services
.otel_manager .otel_manager
.counter("codex.model_warning", 1, &[]); .counter("codex.model_warning", 1, &[]);
let item = ResponseItem::Message { let item = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -2091,7 +2107,7 @@ impl Session {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
self.record_conversation_items(ctx, &[item]).await; self.record_conversation_items(ctx, &[item]).await;
} }
@@ -4475,7 +4491,7 @@ async fn maybe_complete_plan_item_from_message(
state: &mut PlanModeStreamState, state: &mut PlanModeStreamState,
item: &ResponseItem, item: &ResponseItem,
) { ) {
if let ResponseItem::Message { role, content, .. } = item if let ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) = item
&& role == "assistant" && role == "assistant"
{ {
let mut text = String::new(); let mut text = String::new();
@@ -4568,7 +4584,7 @@ async fn handle_assistant_item_done_in_plan_mode(
previously_active_item: Option<&TurnItem>, previously_active_item: Option<&TurnItem>,
last_agent_message: &mut Option<String>, last_agent_message: &mut Option<String>,
) -> bool { ) -> bool {
if let ResponseItem::Message { role, .. } = item if let ResponseItem::Message(codex_protocol::models::Message { role, .. }) = item
&& role == "assistant" && role == "assistant"
{ {
maybe_complete_plan_item_from_message(sess, turn_context, state, item).await; maybe_complete_plan_item_from_message(sess, turn_context, state, item).await;
@@ -4912,7 +4928,7 @@ async fn try_run_sampling_request(
pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> { pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> {
responses.iter().rev().find_map(|item| { responses.iter().rev().find_map(|item| {
if let ResponseItem::Message { role, content, .. } = item { if let ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) = item {
if role == "assistant" { if role == "assistant" {
content.iter().rev().find_map(|ci| { content.iter().rev().find_map(|ci| {
if let ContentItem::OutputText { text } = ci { if let ContentItem::OutputText { text } = ci {
@@ -4994,7 +5010,7 @@ mod tests {
} }
fn user_message(text: &str) -> ResponseItem { fn user_message(text: &str) -> ResponseItem {
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -5002,7 +5018,7 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
} })
} }
fn make_connector(id: &str, name: &str) -> AppInfo { fn make_connector(id: &str, name: &str) -> AppInfo {
@@ -5136,7 +5152,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn reconstruct_history_uses_replacement_history_verbatim() { async fn reconstruct_history_uses_replacement_history_verbatim() {
let (session, turn_context) = make_session_and_context().await; let (session, turn_context) = make_session_and_context().await;
let summary_item = ResponseItem::Message { let summary_item = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -5144,10 +5160,10 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
let replacement_history = vec![ let replacement_history = vec![
summary_item.clone(), summary_item.clone(),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -5155,7 +5171,7 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
let rollout_items = vec![RolloutItem::Compacted(CompactedItem { let rollout_items = vec![RolloutItem::Compacted(CompactedItem {
message: String::new(), message: String::new(),
@@ -5352,7 +5368,7 @@ mod tests {
.await; .await;
let turn_1 = vec![ let turn_1 = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -5360,8 +5376,8 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -5369,12 +5385,12 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
sess.record_into_history(&turn_1, tc.as_ref()).await; sess.record_into_history(&turn_1, tc.as_ref()).await;
let turn_2 = vec![ let turn_2 = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -5382,8 +5398,8 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -5391,7 +5407,7 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
sess.record_into_history(&turn_2, tc.as_ref()).await; sess.record_into_history(&turn_2, tc.as_ref()).await;
@@ -5416,7 +5432,7 @@ mod tests {
sess.record_into_history(&initial_context, tc.as_ref()) sess.record_into_history(&initial_context, tc.as_ref())
.await; .await;
let turn_1 = vec![ResponseItem::Message { let turn_1 = vec![ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -5424,7 +5440,7 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}]; })];
sess.record_into_history(&turn_1, tc.as_ref()).await; sess.record_into_history(&turn_1, tc.as_ref()).await;
handlers::thread_rollback(&sess, "sub-1".to_string(), 99).await; handlers::thread_rollback(&sess, "sub-1".to_string(), 99).await;
@@ -5483,6 +5499,7 @@ mod tests {
let codex_home = tempfile::tempdir().expect("create temp dir"); let codex_home = tempfile::tempdir().expect("create temp dir");
let config = build_test_config(codex_home.path()).await; let config = build_test_config(codex_home.path()).await;
let config = Arc::new(config); let config = Arc::new(config);
let conversation_id = ThreadId::default();
let model = ModelsManager::get_model_offline(config.model.as_deref()); let model = ModelsManager::get_model_offline(config.model.as_deref());
let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config); let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config);
let reasoning_effort = config.model_reasoning_effort; let reasoning_effort = config.model_reasoning_effort;
@@ -5517,7 +5534,12 @@ mod tests {
dynamic_tools: Vec::new(), dynamic_tools: Vec::new(),
}; };
let mut state = SessionState::new(session_configuration); let mut state = SessionState::new(
session_configuration,
conversation_id.to_string(),
config.codex_home.clone(),
effective_model_context_window(&model_info),
);
let initial = RateLimitSnapshot { let initial = RateLimitSnapshot {
primary: Some(RateLimitWindow { primary: Some(RateLimitWindow {
used_percent: 10.0, used_percent: 10.0,
@@ -5566,6 +5588,7 @@ mod tests {
let codex_home = tempfile::tempdir().expect("create temp dir"); let codex_home = tempfile::tempdir().expect("create temp dir");
let config = build_test_config(codex_home.path()).await; let config = build_test_config(codex_home.path()).await;
let config = Arc::new(config); let config = Arc::new(config);
let conversation_id = ThreadId::default();
let model = ModelsManager::get_model_offline(config.model.as_deref()); let model = ModelsManager::get_model_offline(config.model.as_deref());
let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config); let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config);
let reasoning_effort = config.model_reasoning_effort; let reasoning_effort = config.model_reasoning_effort;
@@ -5600,7 +5623,12 @@ mod tests {
dynamic_tools: Vec::new(), dynamic_tools: Vec::new(),
}; };
let mut state = SessionState::new(session_configuration); let mut state = SessionState::new(
session_configuration,
conversation_id.to_string(),
config.codex_home.clone(),
effective_model_context_window(&model_info),
);
let initial = RateLimitSnapshot { let initial = RateLimitSnapshot {
primary: Some(RateLimitWindow { primary: Some(RateLimitWindow {
used_percent: 15.0, used_percent: 15.0,
@@ -5885,7 +5913,12 @@ mod tests {
session_configuration.session_source.clone(), session_configuration.session_source.clone(),
); );
let mut state = SessionState::new(session_configuration.clone()); let mut state = SessionState::new(
session_configuration.clone(),
conversation_id.to_string(),
config.codex_home.clone(),
effective_model_context_window(&model_info),
);
mark_state_initial_context_seeded(&mut state); mark_state_initial_context_seeded(&mut state);
let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone())); let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone()));
@@ -6017,7 +6050,12 @@ mod tests {
session_configuration.session_source.clone(), session_configuration.session_source.clone(),
); );
let mut state = SessionState::new(session_configuration.clone()); let mut state = SessionState::new(
session_configuration.clone(),
conversation_id.to_string(),
config.codex_home.clone(),
effective_model_context_window(&model_info),
);
mark_state_initial_context_seeded(&mut state); mark_state_initial_context_seeded(&mut state);
let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone())); let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone()));
@@ -6144,7 +6182,7 @@ mod tests {
let last = history_items.last().expect("warning recorded"); let last = history_items.last().expect("warning recorded");
match last { match last {
ResponseItem::Message { role, content, .. } => { ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) => {
assert_eq!(role, "user"); assert_eq!(role, "user");
assert_eq!( assert_eq!(
content, content,
@@ -6282,7 +6320,7 @@ mod tests {
sess.on_task_finished(Arc::clone(&tc), None).await; sess.on_task_finished(Arc::clone(&tc), None).await;
let history = sess.clone_history().await; let history = sess.clone_history().await;
let expected = ResponseItem::Message { let expected = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -6290,7 +6328,7 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
assert!( assert!(
history.raw_items().iter().any(|item| item == &expected), history.raw_items().iter().any(|item| item == &expected),
"expected pending input to be persisted into history on turn completion" "expected pending input to be persisted into history on turn completion"
@@ -6438,7 +6476,10 @@ mod tests {
// recorded in history for the model. // recorded in history for the model.
assert!( assert!(
history.raw_items().iter().any(|item| { history.raw_items().iter().any(|item| {
let ResponseItem::Message { role, content, .. } = item else { let ResponseItem::Message(codex_protocol::models::Message {
role, content, ..
}) = item
else {
return false; return false;
}; };
if role != "user" { if role != "user" {
@@ -6477,13 +6518,13 @@ mod tests {
), ),
turn_context.dynamic_tools.as_slice(), turn_context.dynamic_tools.as_slice(),
); );
let item = ResponseItem::CustomToolCall { let item = ResponseItem::CustomToolCall(codex_protocol::models::CustomToolCall {
id: None, id: None,
status: None, status: None,
call_id: "call-1".to_string(), call_id: "call-1".to_string(),
name: "shell".to_string(), name: "shell".to_string(),
input: "{}".to_string(), input: "{}".to_string(),
}; });
let call = ToolRouter::build_tool_call(session.as_ref(), item.clone()) let call = ToolRouter::build_tool_call(session.as_ref(), item.clone())
.await .await
@@ -6513,7 +6554,11 @@ mod tests {
turn_context: &TurnContext, turn_context: &TurnContext,
) -> (Vec<RolloutItem>, Vec<ResponseItem>) { ) -> (Vec<RolloutItem>, Vec<ResponseItem>) {
let mut rollout_items = Vec::new(); let mut rollout_items = Vec::new();
let mut live_history = ContextManager::new(); let mut live_history = ContextManager::new(
Arc::new(session.conversation_id.to_string()),
Arc::new(turn_context.config.codex_home.clone()),
turn_context.model_context_window(),
);
let initial_context = session.build_initial_context(turn_context).await; let initial_context = session.build_initial_context(turn_context).await;
for item in &initial_context { for item in &initial_context {
@@ -6521,7 +6566,7 @@ mod tests {
} }
live_history.record_items(initial_context.iter(), turn_context.truncation_policy); live_history.record_items(initial_context.iter(), turn_context.truncation_policy);
let user1 = ResponseItem::Message { let user1 = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -6529,11 +6574,11 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
live_history.record_items(std::iter::once(&user1), turn_context.truncation_policy); live_history.record_items(std::iter::once(&user1), turn_context.truncation_policy);
rollout_items.push(RolloutItem::ResponseItem(user1.clone())); rollout_items.push(RolloutItem::ResponseItem(user1.clone()));
let assistant1 = ResponseItem::Message { let assistant1 = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -6541,7 +6586,7 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
live_history.record_items(std::iter::once(&assistant1), turn_context.truncation_policy); live_history.record_items(std::iter::once(&assistant1), turn_context.truncation_policy);
rollout_items.push(RolloutItem::ResponseItem(assistant1.clone())); rollout_items.push(RolloutItem::ResponseItem(assistant1.clone()));
@@ -6559,7 +6604,7 @@ mod tests {
replacement_history: None, replacement_history: None,
})); }));
let user2 = ResponseItem::Message { let user2 = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -6567,11 +6612,11 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
live_history.record_items(std::iter::once(&user2), turn_context.truncation_policy); live_history.record_items(std::iter::once(&user2), turn_context.truncation_policy);
rollout_items.push(RolloutItem::ResponseItem(user2.clone())); rollout_items.push(RolloutItem::ResponseItem(user2.clone()));
let assistant2 = ResponseItem::Message { let assistant2 = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -6579,7 +6624,7 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
live_history.record_items(std::iter::once(&assistant2), turn_context.truncation_policy); live_history.record_items(std::iter::once(&assistant2), turn_context.truncation_policy);
rollout_items.push(RolloutItem::ResponseItem(assistant2.clone())); rollout_items.push(RolloutItem::ResponseItem(assistant2.clone()));
@@ -6597,7 +6642,7 @@ mod tests {
replacement_history: None, replacement_history: None,
})); }));
let user3 = ResponseItem::Message { let user3 = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -6605,11 +6650,11 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
live_history.record_items(std::iter::once(&user3), turn_context.truncation_policy); live_history.record_items(std::iter::once(&user3), turn_context.truncation_policy);
rollout_items.push(RolloutItem::ResponseItem(user3)); rollout_items.push(RolloutItem::ResponseItem(user3));
let assistant3 = ResponseItem::Message { let assistant3 = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -6617,7 +6662,7 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
live_history.record_items(std::iter::once(&assistant3), turn_context.truncation_policy); live_history.record_items(std::iter::once(&assistant3), turn_context.truncation_policy);
rollout_items.push(RolloutItem::ResponseItem(assistant3)); rollout_items.push(RolloutItem::ResponseItem(assistant3));

View File

@@ -493,13 +493,13 @@ mod tests {
.send(Event { .send(Event {
id: "evt".to_string(), id: "evt".to_string(),
msg: EventMsg::RawResponseItem(RawResponseItemEvent { msg: EventMsg::RawResponseItem(RawResponseItemEvent {
item: ResponseItem::CustomToolCall { item: ResponseItem::CustomToolCall(codex_protocol::models::CustomToolCall {
id: None, id: None,
status: None, status: None,
call_id: "call-1".to_string(), call_id: "call-1".to_string(),
name: "tool".to_string(), name: "tool".to_string(),
input: "{}".to_string(), input: "{}".to_string(),
}, }),
}), }),
}) })
.await .await

View File

@@ -192,7 +192,12 @@ async fn run_compact_task_inner(
let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text); let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
let ghost_snapshots: Vec<ResponseItem> = history_items let ghost_snapshots: Vec<ResponseItem> = history_items
.iter() .iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. })) .filter(|item| {
matches!(
item,
ResponseItem::GhostSnapshot(codex_protocol::models::GhostSnapshot { .. })
)
})
.cloned() .cloned()
.collect(); .collect();
new_history.extend(ghost_snapshots); new_history.extend(ghost_snapshots);
@@ -293,11 +298,17 @@ pub(crate) fn process_compacted_history(
/// summary messages because they parse as `TurnItem::UserMessage`. /// summary messages because they parse as `TurnItem::UserMessage`.
fn should_keep_compacted_history_item(item: &ResponseItem) -> bool { fn should_keep_compacted_history_item(item: &ResponseItem) -> bool {
match item { match item {
ResponseItem::Message { role, .. } if role == "developer" => false, ResponseItem::Message(codex_protocol::models::Message { role, .. })
ResponseItem::Message { role, .. } if role == "user" => matches!( if role == "developer" =>
crate::event_mapping::parse_turn_item(item), {
Some(TurnItem::UserMessage(_)) false
), }
ResponseItem::Message(codex_protocol::models::Message { role, .. }) if role == "user" => {
matches!(
crate::event_mapping::parse_turn_item(item),
Some(TurnItem::UserMessage(_))
)
}
_ => true, _ => true,
} }
} }
@@ -342,7 +353,7 @@ fn build_compacted_history_with_limit(
} }
for message in &selected_messages { for message in &selected_messages {
history.push(ResponseItem::Message { history.push(ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -350,7 +361,7 @@ fn build_compacted_history_with_limit(
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}); }));
} }
let summary_text = if summary_text.is_empty() { let summary_text = if summary_text.is_empty() {
@@ -359,13 +370,13 @@ fn build_compacted_history_with_limit(
summary_text.to_string() summary_text.to_string()
}; };
history.push(ResponseItem::Message { history.push(ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { text: summary_text }], content: vec![ContentItem::InputText { text: summary_text }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}); }));
history history
} }
@@ -456,7 +467,7 @@ mod tests {
#[test] #[test]
fn collect_user_messages_extracts_user_text_only() { fn collect_user_messages_extracts_user_text_only() {
let items = vec![ let items = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: Some("assistant".to_string()), id: Some("assistant".to_string()),
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -464,8 +475,8 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: Some("user".to_string()), id: Some("user".to_string()),
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -473,7 +484,7 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Other, ResponseItem::Other,
]; ];
@@ -485,7 +496,7 @@ mod tests {
#[test] #[test]
fn collect_user_messages_filters_session_prefix_entries() { fn collect_user_messages_filters_session_prefix_entries() {
let items = vec![ let items = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -498,8 +509,8 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -507,8 +518,8 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -516,7 +527,7 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
let collected = collect_user_messages(&items); let collected = collect_user_messages(&items);
@@ -541,12 +552,13 @@ do things
let truncated_message = &history[0]; let truncated_message = &history[0];
let summary_message = &history[1]; let summary_message = &history[1];
let truncated_text = match truncated_message { let truncated_text =
ResponseItem::Message { role, content, .. } if role == "user" => { match truncated_message {
content_items_to_text(content).unwrap_or_default() ResponseItem::Message(codex_protocol::models::Message {
} role, content, ..
other => panic!("unexpected item in history: {other:?}"), }) if role == "user" => content_items_to_text(content).unwrap_or_default(),
}; other => panic!("unexpected item in history: {other:?}"),
};
assert!( assert!(
truncated_text.contains("tokens truncated"), truncated_text.contains("tokens truncated"),
@@ -557,12 +569,13 @@ do things
"truncated user message should not include the full oversized user text" "truncated user message should not include the full oversized user text"
); );
let summary_text = match summary_message { let summary_text =
ResponseItem::Message { role, content, .. } if role == "user" => { match summary_message {
content_items_to_text(content).unwrap_or_default() ResponseItem::Message(codex_protocol::models::Message {
} role, content, ..
other => panic!("unexpected item in history: {other:?}"), }) if role == "user" => content_items_to_text(content).unwrap_or_default(),
}; other => panic!("unexpected item in history: {other:?}"),
};
assert_eq!(summary_text, "SUMMARY"); assert_eq!(summary_text, "SUMMARY");
} }
@@ -579,19 +592,20 @@ do things
); );
let last = history.last().expect("history should have a summary entry"); let last = history.last().expect("history should have a summary entry");
let summary = match last { let summary =
ResponseItem::Message { role, content, .. } if role == "user" => { match last {
content_items_to_text(content).unwrap_or_default() ResponseItem::Message(codex_protocol::models::Message {
} role, content, ..
other => panic!("expected summary message, found {other:?}"), }) if role == "user" => content_items_to_text(content).unwrap_or_default(),
}; other => panic!("expected summary message, found {other:?}"),
};
assert_eq!(summary, summary_text); assert_eq!(summary, summary_text);
} }
#[test] #[test]
fn process_compacted_history_replaces_developer_messages() { fn process_compacted_history_replaces_developer_messages() {
let compacted_history = vec![ let compacted_history = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -599,8 +613,8 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -608,8 +622,8 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -617,10 +631,10 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
let initial_context = vec![ let initial_context = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -628,8 +642,8 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -641,8 +655,8 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -650,12 +664,12 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
let refreshed = process_compacted_history(compacted_history, &initial_context); let refreshed = process_compacted_history(compacted_history, &initial_context);
let expected = vec![ let expected = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -663,8 +677,8 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -676,8 +690,8 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -685,8 +699,8 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -694,14 +708,14 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
assert_eq!(refreshed, expected); assert_eq!(refreshed, expected);
} }
#[test] #[test]
fn process_compacted_history_reinjects_full_initial_context() { fn process_compacted_history_reinjects_full_initial_context() {
let compacted_history = vec![ResponseItem::Message { let compacted_history = vec![ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -709,9 +723,9 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}]; })];
let initial_context = vec![ let initial_context = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -719,8 +733,8 @@ do things
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -733,8 +747,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -746,8 +760,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -759,12 +773,12 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
let refreshed = process_compacted_history(compacted_history, &initial_context); let refreshed = process_compacted_history(compacted_history, &initial_context);
let expected = vec![ let expected = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -772,8 +786,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -786,8 +800,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -799,8 +813,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -812,8 +826,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -821,7 +835,7 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
assert_eq!(refreshed, expected); assert_eq!(refreshed, expected);
} }
@@ -829,7 +843,7 @@ keep me updated
#[test] #[test]
fn process_compacted_history_drops_non_user_content_messages() { fn process_compacted_history_drops_non_user_content_messages() {
let compacted_history = vec![ let compacted_history = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -842,8 +856,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -855,8 +869,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -868,8 +882,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -877,8 +891,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -886,9 +900,9 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
let initial_context = vec![ResponseItem::Message { let initial_context = vec![ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -896,11 +910,11 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}]; })];
let refreshed = process_compacted_history(compacted_history, &initial_context); let refreshed = process_compacted_history(compacted_history, &initial_context);
let expected = vec![ let expected = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -908,8 +922,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -917,7 +931,7 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
assert_eq!(refreshed, expected); assert_eq!(refreshed, expected);
} }
@@ -925,7 +939,7 @@ keep me updated
#[test] #[test]
fn process_compacted_history_inserts_context_before_last_real_user_message_only() { fn process_compacted_history_inserts_context_before_last_real_user_message_only() {
let compacted_history = vec![ let compacted_history = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -933,8 +947,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -942,8 +956,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -951,9 +965,9 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
let initial_context = vec![ResponseItem::Message { let initial_context = vec![ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -961,11 +975,11 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}]; })];
let refreshed = process_compacted_history(compacted_history, &initial_context); let refreshed = process_compacted_history(compacted_history, &initial_context);
let expected = vec![ let expected = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -973,8 +987,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -982,8 +996,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -991,8 +1005,8 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -1000,7 +1014,7 @@ keep me updated
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
assert_eq!(refreshed, expected); assert_eq!(refreshed, expected);
} }

View File

@@ -77,7 +77,12 @@ async fn run_remote_compact_task_inner_impl(
let ghost_snapshots: Vec<ResponseItem> = history let ghost_snapshots: Vec<ResponseItem> = history
.raw_items() .raw_items()
.iter() .iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. })) .filter(|item| {
matches!(
item,
ResponseItem::GhostSnapshot(codex_protocol::models::GhostSnapshot { .. })
)
})
.cloned() .cloned()
.collect(); .collect();

View File

@@ -0,0 +1,158 @@
use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
use crate::truncate::truncate_function_output_items_with_policy;
use crate::truncate::truncate_text;
use codex_protocol::models::ContentItem;
use codex_protocol::models::CustomToolCallOutput;
use codex_protocol::models::FunctionCallOutput;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::Message;
use codex_protocol::models::ResponseItem;
use sha2::Digest;
use sha2::Sha256;
use std::path::Path;
pub(super) trait ContextDiscoverable {
fn discoverable_history_item(
&self,
policy: TruncationPolicy,
codex_home: Option<&Path>,
thread_id: Option<&str>,
context_window_tokens: Option<i64>,
current_usage_tokens: i64,
) -> ResponseItem;
}
impl ContextDiscoverable for FunctionCallOutput {
fn discoverable_history_item(
&self,
policy: TruncationPolicy,
_codex_home: Option<&Path>,
_thread_id: Option<&str>,
_context_window_tokens: Option<i64>,
_current_usage_tokens: i64,
) -> ResponseItem {
let body = match &self.output.body {
FunctionCallOutputBody::Text(content) => {
FunctionCallOutputBody::Text(truncate_text(content, policy))
}
FunctionCallOutputBody::ContentItems(items) => FunctionCallOutputBody::ContentItems(
truncate_function_output_items_with_policy(items, policy),
),
};
ResponseItem::FunctionCallOutput(FunctionCallOutput {
call_id: self.call_id.clone(),
output: FunctionCallOutputPayload {
body,
success: self.output.success,
},
})
}
}
impl ContextDiscoverable for CustomToolCallOutput {
fn discoverable_history_item(
&self,
policy: TruncationPolicy,
_codex_home: Option<&Path>,
_thread_id: Option<&str>,
_context_window_tokens: Option<i64>,
_current_usage_tokens: i64,
) -> ResponseItem {
ResponseItem::CustomToolCallOutput(CustomToolCallOutput {
call_id: self.call_id.clone(),
output: truncate_text(&self.output, policy),
})
}
}
impl ContextDiscoverable for Message {
fn discoverable_history_item(
&self,
_policy: TruncationPolicy,
codex_home: Option<&Path>,
thread_id: Option<&str>,
context_window_tokens: Option<i64>,
current_usage_tokens: i64,
) -> ResponseItem {
let (Some(codex_home), Some(thread_id), Some(context_window_tokens)) =
(codex_home, thread_id, context_window_tokens)
else {
return ResponseItem::Message(self.clone());
};
if self.role != "user" {
return ResponseItem::Message(self.clone());
}
let Some(user_text) = extract_message_text(&self.content) else {
return ResponseItem::Message(self.clone());
};
let estimated_tokens = i64::try_from(approx_token_count(&user_text)).unwrap_or(i64::MAX);
if current_usage_tokens.saturating_add(estimated_tokens) <= context_window_tokens {
return ResponseItem::Message(self.clone());
}
let checksum = checksum_hex(&user_text);
let output_path = codex_home
.join("discovarable_items")
.join(thread_id)
.join("user_message")
.join(checksum);
if let Some(parent) = output_path.parent()
&& let Err(err) = std::fs::create_dir_all(parent)
{
tracing::warn!(
path = %parent.display(),
"failed to create discoverable message directory: {err}"
);
return ResponseItem::Message(self.clone());
}
if let Err(err) = std::fs::write(&output_path, &user_text) {
tracing::warn!(
path = %output_path.display(),
"failed to write discoverable user message file: {err}"
);
return ResponseItem::Message(self.clone());
}
let replacement = format!(
"User message was too large. Read it from <{}>",
output_path.display()
);
let mut rewritten = self.clone();
rewritten.content = vec![ContentItem::InputText { text: replacement }];
ResponseItem::Message(rewritten)
}
}
fn extract_message_text(content: &[ContentItem]) -> Option<String> {
let parts: Vec<&str> = content
.iter()
.filter_map(|item| match item {
ContentItem::InputText { text } | ContentItem::OutputText { text }
if !text.trim().is_empty() =>
{
Some(text.as_str())
}
ContentItem::InputText { .. } | ContentItem::OutputText { .. } => None,
ContentItem::InputImage { .. } => None,
})
.collect();
if parts.is_empty() {
None
} else {
Some(parts.join("\n\n"))
}
}
fn checksum_hex(content: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
format!("{:x}", hasher.finalize())
}

View File

@@ -1,4 +1,5 @@
use crate::codex::TurnContext; use crate::codex::TurnContext;
use crate::context_manager::context_discoverable::ContextDiscoverable;
use crate::context_manager::normalize; use crate::context_manager::normalize;
use crate::instructions::SkillInstructions; use crate::instructions::SkillInstructions;
use crate::instructions::UserInstructions; use crate::instructions::UserInstructions;
@@ -6,32 +7,38 @@ use crate::session_prefix::is_session_prefix;
use crate::truncate::TruncationPolicy; use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count; use crate::truncate::approx_token_count;
use crate::truncate::approx_tokens_from_byte_count; use crate::truncate::approx_tokens_from_byte_count;
use crate::truncate::truncate_function_output_items_with_policy;
use crate::truncate::truncate_text;
use crate::user_shell_command::is_user_shell_command_text; use crate::user_shell_command::is_user_shell_command_text;
use codex_protocol::models::BaseInstructions; use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ContentItem; use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputContentItem; use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem; use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::TokenUsage; use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo; use codex_protocol::protocol::TokenUsageInfo;
use std::ops::Deref; use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
/// Transcript of thread history /// Transcript of thread history
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone)]
pub(crate) struct ContextManager { pub(crate) struct ContextManager {
/// The oldest items are at the beginning of the vector. /// The oldest items are at the beginning of the vector.
items: Vec<ResponseItem>, items: Vec<ResponseItem>,
token_info: Option<TokenUsageInfo>, token_info: Option<TokenUsageInfo>,
thread_id: Arc<String>,
codex_home: Arc<PathBuf>,
} }
impl ContextManager { impl ContextManager {
pub(crate) fn new() -> Self { pub(crate) fn new(
thread_id: Arc<String>,
codex_home: Arc<PathBuf>,
model_context_window: Option<i64>,
) -> Self {
Self { Self {
items: Vec::new(), items: Vec::new(),
token_info: TokenUsageInfo::new_or_append(&None, &None, None), token_info: TokenUsageInfo::new_or_append(&None, &None, model_context_window),
thread_id,
codex_home,
} }
} }
@@ -60,7 +67,10 @@ impl ContextManager {
{ {
for item in items { for item in items {
let item_ref = item.deref(); let item_ref = item.deref();
let is_ghost_snapshot = matches!(item_ref, ResponseItem::GhostSnapshot { .. }); let is_ghost_snapshot = matches!(
item_ref,
ResponseItem::GhostSnapshot(codex_protocol::models::GhostSnapshot { .. })
);
if !is_api_message(item_ref) && !is_ghost_snapshot { if !is_api_message(item_ref) && !is_ghost_snapshot {
continue; continue;
} }
@@ -74,8 +84,12 @@ impl ContextManager {
/// normalization and drop un-suited items. /// normalization and drop un-suited items.
pub(crate) fn for_prompt(mut self) -> Vec<ResponseItem> { pub(crate) fn for_prompt(mut self) -> Vec<ResponseItem> {
self.normalize_history(); self.normalize_history();
self.items self.items.retain(|item| {
.retain(|item| !matches!(item, ResponseItem::GhostSnapshot { .. })); !matches!(
item,
ResponseItem::GhostSnapshot(codex_protocol::models::GhostSnapshot { .. })
)
});
self.items self.items
} }
@@ -138,14 +152,17 @@ impl ContextManager {
/// Returns true when a tool image was replaced, false otherwise. /// Returns true when a tool image was replaced, false otherwise.
pub(crate) fn replace_last_turn_images(&mut self, placeholder: &str) -> bool { pub(crate) fn replace_last_turn_images(&mut self, placeholder: &str) -> bool {
let Some(index) = self.items.iter().rposition(|item| { let Some(index) = self.items.iter().rposition(|item| {
matches!(item, ResponseItem::FunctionCallOutput { .. }) matches!(item, ResponseItem::FunctionCallOutput(codex_protocol::models::FunctionCallOutput { .. }))
|| matches!(item, ResponseItem::Message { role, .. } if role == "user") || matches!(item, ResponseItem::Message(codex_protocol::models::Message { role, .. }) if role == "user")
}) else { }) else {
return false; return false;
}; };
match &mut self.items[index] { match &mut self.items[index] {
ResponseItem::FunctionCallOutput { output, .. } => { ResponseItem::FunctionCallOutput(codex_protocol::models::FunctionCallOutput {
output,
..
}) => {
let Some(content_items) = output.content_items_mut() else { let Some(content_items) = output.content_items_mut() else {
return false; return false;
}; };
@@ -161,7 +178,11 @@ impl ContextManager {
} }
replaced replaced
} }
ResponseItem::Message { role, .. } if role == "user" => false, ResponseItem::Message(codex_protocol::models::Message { role, .. })
if role == "user" =>
{
false
}
_ => false, _ => false,
} }
} }
@@ -214,7 +235,7 @@ impl ContextManager {
let Some(last_user_index) = self let Some(last_user_index) = self
.items .items
.iter() .iter()
.rposition(|item| matches!(item, ResponseItem::Message { role, .. } if role == "user")) .rposition(|item| matches!(item, ResponseItem::Message(codex_protocol::models::Message { role, .. }) if role == "user"))
else { else {
return 0; return 0;
}; };
@@ -225,10 +246,10 @@ impl ContextManager {
.filter(|item| { .filter(|item| {
matches!( matches!(
item, item,
ResponseItem::Reasoning { ResponseItem::Reasoning(codex_protocol::models::Reasoning {
encrypted_content: Some(_), encrypted_content: Some(_),
.. ..
} })
) )
}) })
.fold(0i64, |acc, item| { .fold(0i64, |acc, item| {
@@ -277,46 +298,46 @@ impl ContextManager {
} }
fn process_item(&self, item: &ResponseItem, policy: TruncationPolicy) -> ResponseItem { fn process_item(&self, item: &ResponseItem, policy: TruncationPolicy) -> ResponseItem {
let used_tokens = self
.token_info
.as_ref()
.map(|info| info.last_token_usage.total_tokens)
.unwrap_or(0);
let context_window = self
.token_info
.as_ref()
.and_then(|info| info.model_context_window);
let policy_with_serialization_budget = policy * 1.2; let policy_with_serialization_budget = policy * 1.2;
match item { match item {
ResponseItem::FunctionCallOutput { call_id, output } => { ResponseItem::Reasoning(_)
let body = match &output.body { | ResponseItem::LocalShellCall(_)
FunctionCallOutputBody::Text(content) => FunctionCallOutputBody::Text( | ResponseItem::FunctionCall(_)
truncate_text(content, policy_with_serialization_budget), | ResponseItem::CustomToolCall(_)
), | ResponseItem::WebSearchCall(_)
FunctionCallOutputBody::ContentItems(items) => { | ResponseItem::Compaction(_)
FunctionCallOutputBody::ContentItems( | ResponseItem::GhostSnapshot(_)
truncate_function_output_items_with_policy(
items,
policy_with_serialization_budget,
),
)
}
};
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: FunctionCallOutputPayload {
body,
success: output.success,
},
}
}
ResponseItem::CustomToolCallOutput { call_id, output } => {
let truncated = truncate_text(output, policy_with_serialization_budget);
ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: truncated,
}
}
ResponseItem::Message { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::Compaction { .. }
| ResponseItem::GhostSnapshot { .. }
| ResponseItem::Other => item.clone(), | ResponseItem::Other => item.clone(),
ResponseItem::FunctionCallOutput(item) => item.discoverable_history_item(
policy_with_serialization_budget,
Some(self.codex_home.as_path()),
Some(self.thread_id.as_str()),
context_window,
used_tokens,
),
ResponseItem::CustomToolCallOutput(item) => item.discoverable_history_item(
policy_with_serialization_budget,
Some(self.codex_home.as_path()),
Some(self.thread_id.as_str()),
context_window,
used_tokens,
),
ResponseItem::Message(item) => item.discoverable_history_item(
policy_with_serialization_budget,
Some(self.codex_home.as_path()),
Some(self.thread_id.as_str()),
context_window,
used_tokens,
),
} }
} }
} }
@@ -325,16 +346,20 @@ impl ContextManager {
/// tool calls, tool outputs, shell calls, and web-search calls). /// tool calls, tool outputs, shell calls, and web-search calls).
fn is_api_message(message: &ResponseItem) -> bool { fn is_api_message(message: &ResponseItem) -> bool {
match message { match message {
ResponseItem::Message { role, .. } => role.as_str() != "system", ResponseItem::Message(codex_protocol::models::Message { role, .. }) => {
ResponseItem::FunctionCallOutput { .. } role.as_str() != "system"
| ResponseItem::FunctionCall { .. } }
| ResponseItem::CustomToolCall { .. } ResponseItem::FunctionCallOutput(codex_protocol::models::FunctionCallOutput { .. })
| ResponseItem::CustomToolCallOutput { .. } | ResponseItem::FunctionCall(codex_protocol::models::FunctionCall { .. })
| ResponseItem::LocalShellCall { .. } | ResponseItem::CustomToolCall(codex_protocol::models::CustomToolCall { .. })
| ResponseItem::Reasoning { .. } | ResponseItem::CustomToolCallOutput(codex_protocol::models::CustomToolCallOutput {
| ResponseItem::WebSearchCall { .. } ..
| ResponseItem::Compaction { .. } => true, })
ResponseItem::GhostSnapshot { .. } => false, | ResponseItem::LocalShellCall(codex_protocol::models::LocalShellCall { .. })
| ResponseItem::Reasoning(codex_protocol::models::Reasoning { .. })
| ResponseItem::WebSearchCall(codex_protocol::models::WebSearchCall { .. })
| ResponseItem::Compaction(codex_protocol::models::Compaction { .. }) => true,
ResponseItem::GhostSnapshot(codex_protocol::models::GhostSnapshot { .. }) => false,
ResponseItem::Other => false, ResponseItem::Other => false,
} }
} }
@@ -349,14 +374,14 @@ fn estimate_reasoning_length(encoded_len: usize) -> usize {
fn estimate_item_token_count(item: &ResponseItem) -> i64 { fn estimate_item_token_count(item: &ResponseItem) -> i64 {
match item { match item {
ResponseItem::GhostSnapshot { .. } => 0, ResponseItem::GhostSnapshot(codex_protocol::models::GhostSnapshot { .. }) => 0,
ResponseItem::Reasoning { ResponseItem::Reasoning(codex_protocol::models::Reasoning {
encrypted_content: Some(content), encrypted_content: Some(content),
.. ..
} })
| ResponseItem::Compaction { | ResponseItem::Compaction(codex_protocol::models::Compaction {
encrypted_content: content, encrypted_content: content,
} => { }) => {
let reasoning_bytes = estimate_reasoning_length(content.len()); let reasoning_bytes = estimate_reasoning_length(content.len());
i64::try_from(approx_tokens_from_byte_count(reasoning_bytes)).unwrap_or(i64::MAX) i64::try_from(approx_tokens_from_byte_count(reasoning_bytes)).unwrap_or(i64::MAX)
} }
@@ -370,12 +395,15 @@ fn estimate_item_token_count(item: &ResponseItem) -> i64 {
pub(crate) fn is_codex_generated_item(item: &ResponseItem) -> bool { pub(crate) fn is_codex_generated_item(item: &ResponseItem) -> bool {
matches!( matches!(
item, item,
ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } ResponseItem::FunctionCallOutput(codex_protocol::models::FunctionCallOutput { .. })
) || matches!(item, ResponseItem::Message { role, .. } if role == "developer") | ResponseItem::CustomToolCallOutput(
codex_protocol::models::CustomToolCallOutput { .. }
)
) || matches!(item, ResponseItem::Message(codex_protocol::models::Message { role, .. }) if role == "developer")
} }
pub(crate) fn is_user_turn_boundary(item: &ResponseItem) -> bool { pub(crate) fn is_user_turn_boundary(item: &ResponseItem) -> bool {
let ResponseItem::Message { role, content, .. } = item else { let ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) = item else {
return false; return false;
}; };

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,4 @@
mod context_discoverable;
mod history; mod history;
mod normalize; mod normalize;

View File

@@ -15,11 +15,15 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
for (idx, item) in items.iter().enumerate() { for (idx, item) in items.iter().enumerate() {
match item { match item {
ResponseItem::FunctionCall { call_id, .. } => { ResponseItem::FunctionCall(codex_protocol::models::FunctionCall {
call_id, ..
}) => {
let has_output = items.iter().any(|i| match i { let has_output = items.iter().any(|i| match i {
ResponseItem::FunctionCallOutput { ResponseItem::FunctionCallOutput(
call_id: existing, .. codex_protocol::models::FunctionCallOutput {
} => existing == call_id, call_id: existing, ..
},
) => existing == call_id,
_ => false, _ => false,
}); });
@@ -27,21 +31,28 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
info!("Function call output is missing for call id: {call_id}"); info!("Function call output is missing for call id: {call_id}");
missing_outputs_to_insert.push(( missing_outputs_to_insert.push((
idx, idx,
ResponseItem::FunctionCallOutput { ResponseItem::FunctionCallOutput(
call_id: call_id.clone(), codex_protocol::models::FunctionCallOutput {
output: FunctionCallOutputPayload { call_id: call_id.clone(),
body: FunctionCallOutputBody::Text("aborted".to_string()), output: FunctionCallOutputPayload {
..Default::default() body: FunctionCallOutputBody::Text("aborted".to_string()),
..Default::default()
},
}, },
}, ),
)); ));
} }
} }
ResponseItem::CustomToolCall { call_id, .. } => { ResponseItem::CustomToolCall(codex_protocol::models::CustomToolCall {
call_id,
..
}) => {
let has_output = items.iter().any(|i| match i { let has_output = items.iter().any(|i| match i {
ResponseItem::CustomToolCallOutput { ResponseItem::CustomToolCallOutput(
call_id: existing, .. codex_protocol::models::CustomToolCallOutput {
} => existing == call_id, call_id: existing, ..
},
) => existing == call_id,
_ => false, _ => false,
}); });
@@ -51,20 +62,27 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
)); ));
missing_outputs_to_insert.push(( missing_outputs_to_insert.push((
idx, idx,
ResponseItem::CustomToolCallOutput { ResponseItem::CustomToolCallOutput(
call_id: call_id.clone(), codex_protocol::models::CustomToolCallOutput {
output: "aborted".to_string(), call_id: call_id.clone(),
}, output: "aborted".to_string(),
},
),
)); ));
} }
} }
// LocalShellCall is represented in upstream streams by a FunctionCallOutput // LocalShellCall is represented in upstream streams by a FunctionCallOutput
ResponseItem::LocalShellCall { call_id, .. } => { ResponseItem::LocalShellCall(codex_protocol::models::LocalShellCall {
call_id,
..
}) => {
if let Some(call_id) = call_id.as_ref() { if let Some(call_id) = call_id.as_ref() {
let has_output = items.iter().any(|i| match i { let has_output = items.iter().any(|i| match i {
ResponseItem::FunctionCallOutput { ResponseItem::FunctionCallOutput(
call_id: existing, .. codex_protocol::models::FunctionCallOutput {
} => existing == call_id, call_id: existing, ..
},
) => existing == call_id,
_ => false, _ => false,
}); });
@@ -74,13 +92,15 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
)); ));
missing_outputs_to_insert.push(( missing_outputs_to_insert.push((
idx, idx,
ResponseItem::FunctionCallOutput { ResponseItem::FunctionCallOutput(
call_id: call_id.clone(), codex_protocol::models::FunctionCallOutput {
output: FunctionCallOutputPayload { call_id: call_id.clone(),
body: FunctionCallOutputBody::Text("aborted".to_string()), output: FunctionCallOutputPayload {
..Default::default() body: FunctionCallOutputBody::Text("aborted".to_string()),
..Default::default()
},
}, },
}, ),
)); ));
} }
} }
@@ -99,7 +119,9 @@ pub(crate) fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
let function_call_ids: HashSet<String> = items let function_call_ids: HashSet<String> = items
.iter() .iter()
.filter_map(|i| match i { .filter_map(|i| match i {
ResponseItem::FunctionCall { call_id, .. } => Some(call_id.clone()), ResponseItem::FunctionCall(codex_protocol::models::FunctionCall {
call_id, ..
}) => Some(call_id.clone()),
_ => None, _ => None,
}) })
.collect(); .collect();
@@ -107,10 +129,10 @@ pub(crate) fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
let local_shell_call_ids: HashSet<String> = items let local_shell_call_ids: HashSet<String> = items
.iter() .iter()
.filter_map(|i| match i { .filter_map(|i| match i {
ResponseItem::LocalShellCall { ResponseItem::LocalShellCall(codex_protocol::models::LocalShellCall {
call_id: Some(call_id), call_id: Some(call_id),
.. ..
} => Some(call_id.clone()), }) => Some(call_id.clone()),
_ => None, _ => None,
}) })
.collect(); .collect();
@@ -118,13 +140,19 @@ pub(crate) fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
let custom_tool_call_ids: HashSet<String> = items let custom_tool_call_ids: HashSet<String> = items
.iter() .iter()
.filter_map(|i| match i { .filter_map(|i| match i {
ResponseItem::CustomToolCall { call_id, .. } => Some(call_id.clone()), ResponseItem::CustomToolCall(codex_protocol::models::CustomToolCall {
call_id,
..
}) => Some(call_id.clone()),
_ => None, _ => None,
}) })
.collect(); .collect();
items.retain(|item| match item { items.retain(|item| match item {
ResponseItem::FunctionCallOutput { call_id, .. } => { ResponseItem::FunctionCallOutput(codex_protocol::models::FunctionCallOutput {
call_id,
..
}) => {
let has_match = let has_match =
function_call_ids.contains(call_id) || local_shell_call_ids.contains(call_id); function_call_ids.contains(call_id) || local_shell_call_ids.contains(call_id);
if !has_match { if !has_match {
@@ -134,7 +162,10 @@ pub(crate) fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
} }
has_match has_match
} }
ResponseItem::CustomToolCallOutput { call_id, .. } => { ResponseItem::CustomToolCallOutput(codex_protocol::models::CustomToolCallOutput {
call_id,
..
}) => {
let has_match = custom_tool_call_ids.contains(call_id); let has_match = custom_tool_call_ids.contains(call_id);
if !has_match { if !has_match {
error_or_panic(format!( error_or_panic(format!(
@@ -149,53 +180,53 @@ pub(crate) fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
pub(crate) fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &ResponseItem) { pub(crate) fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &ResponseItem) {
match item { match item {
ResponseItem::FunctionCall { call_id, .. } => { ResponseItem::FunctionCall(codex_protocol::models::FunctionCall { call_id, .. }) => {
remove_first_matching(items, |i| { remove_first_matching(items, |i| {
matches!( matches!(
i, i,
ResponseItem::FunctionCallOutput { ResponseItem::FunctionCallOutput(codex_protocol::models::FunctionCallOutput {
call_id: existing, .. call_id: existing, ..
} if existing == call_id }) if existing == call_id
) )
}); });
} }
ResponseItem::FunctionCallOutput { call_id, .. } => { ResponseItem::FunctionCallOutput(codex_protocol::models::FunctionCallOutput { call_id, .. }) => {
if let Some(pos) = items.iter().position(|i| { if let Some(pos) = items.iter().position(|i| {
matches!(i, ResponseItem::FunctionCall { call_id: existing, .. } if existing == call_id) matches!(i, ResponseItem::FunctionCall(codex_protocol::models::FunctionCall { call_id: existing, .. }) if existing == call_id)
}) { }) {
items.remove(pos); items.remove(pos);
} else if let Some(pos) = items.iter().position(|i| { } else if let Some(pos) = items.iter().position(|i| {
matches!(i, ResponseItem::LocalShellCall { call_id: Some(existing), .. } if existing == call_id) matches!(i, ResponseItem::LocalShellCall(codex_protocol::models::LocalShellCall { call_id: Some(existing), .. }) if existing == call_id)
}) { }) {
items.remove(pos); items.remove(pos);
} }
} }
ResponseItem::CustomToolCall { call_id, .. } => { ResponseItem::CustomToolCall(codex_protocol::models::CustomToolCall { call_id, .. }) => {
remove_first_matching(items, |i| { remove_first_matching(items, |i| {
matches!( matches!(
i, i,
ResponseItem::CustomToolCallOutput { ResponseItem::CustomToolCallOutput(codex_protocol::models::CustomToolCallOutput {
call_id: existing, .. call_id: existing, ..
} if existing == call_id }) if existing == call_id
) )
}); });
} }
ResponseItem::CustomToolCallOutput { call_id, .. } => { ResponseItem::CustomToolCallOutput(codex_protocol::models::CustomToolCallOutput { call_id, .. }) => {
remove_first_matching( remove_first_matching(
items, items,
|i| matches!(i, ResponseItem::CustomToolCall { call_id: existing, .. } if existing == call_id), |i| matches!(i, ResponseItem::CustomToolCall(codex_protocol::models::CustomToolCall { call_id: existing, .. }) if existing == call_id),
); );
} }
ResponseItem::LocalShellCall { ResponseItem::LocalShellCall(codex_protocol::models::LocalShellCall {
call_id: Some(call_id), call_id: Some(call_id),
.. ..
} => { }) => {
remove_first_matching(items, |i| { remove_first_matching(items, |i| {
matches!( matches!(
i, i,
ResponseItem::FunctionCallOutput { ResponseItem::FunctionCallOutput(codex_protocol::models::FunctionCallOutput {
call_id: existing, .. call_id: existing, ..
} if existing == call_id }) if existing == call_id
) )
}); });
} }

View File

@@ -125,7 +125,7 @@ impl EnvironmentContext {
impl From<EnvironmentContext> for ResponseItem { impl From<EnvironmentContext> for ResponseItem {
fn from(ec: EnvironmentContext) -> Self { fn from(ec: EnvironmentContext) -> Self {
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -133,7 +133,7 @@ impl From<EnvironmentContext> for ResponseItem {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
} })
} }
} }

View File

@@ -95,13 +95,13 @@ fn parse_agent_message(
pub fn parse_turn_item(item: &ResponseItem) -> Option<TurnItem> { pub fn parse_turn_item(item: &ResponseItem) -> Option<TurnItem> {
match item { match item {
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
role, role,
content, content,
id, id,
phase, phase,
.. ..
} => match role.as_str() { }) => match role.as_str() {
"user" => parse_user_message(content).map(TurnItem::UserMessage), "user" => parse_user_message(content).map(TurnItem::UserMessage),
"assistant" => Some(TurnItem::AgentMessage(parse_agent_message( "assistant" => Some(TurnItem::AgentMessage(parse_agent_message(
id.as_ref(), id.as_ref(),
@@ -111,12 +111,12 @@ pub fn parse_turn_item(item: &ResponseItem) -> Option<TurnItem> {
"system" => None, "system" => None,
_ => None, _ => None,
}, },
ResponseItem::Reasoning { ResponseItem::Reasoning(codex_protocol::models::Reasoning {
id, id,
summary, summary,
content, content,
.. ..
} => { }) => {
let summary_text = summary let summary_text = summary
.iter() .iter()
.map(|entry| match entry { .map(|entry| match entry {
@@ -138,7 +138,9 @@ pub fn parse_turn_item(item: &ResponseItem) -> Option<TurnItem> {
raw_content, raw_content,
})) }))
} }
ResponseItem::WebSearchCall { id, action, .. } => { ResponseItem::WebSearchCall(codex_protocol::models::WebSearchCall {
id, action, ..
}) => {
let (action, query) = match action { let (action, query) = match action {
Some(action) => (action.clone(), web_search_action_detail(action)), Some(action) => (action.clone(), web_search_action_detail(action)),
None => (WebSearchAction::Other, String::new()), None => (WebSearchAction::Other, String::new()),
@@ -172,7 +174,7 @@ mod tests {
let img1 = "https://example.com/one.png".to_string(); let img1 = "https://example.com/one.png".to_string();
let img2 = "https://example.com/two.jpg".to_string(); let img2 = "https://example.com/two.jpg".to_string();
let item = ResponseItem::Message { let item = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ content: vec![
@@ -188,7 +190,7 @@ mod tests {
], ],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
let turn_item = parse_turn_item(&item).expect("expected user message turn item"); let turn_item = parse_turn_item(&item).expect("expected user message turn item");
@@ -214,7 +216,7 @@ mod tests {
let label = codex_protocol::models::local_image_open_tag_text(1); let label = codex_protocol::models::local_image_open_tag_text(1);
let user_text = "Please review this image.".to_string(); let user_text = "Please review this image.".to_string();
let item = ResponseItem::Message { let item = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ content: vec![
@@ -231,7 +233,7 @@ mod tests {
], ],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
let turn_item = parse_turn_item(&item).expect("expected user message turn item"); let turn_item = parse_turn_item(&item).expect("expected user message turn item");
@@ -256,7 +258,7 @@ mod tests {
let label = codex_protocol::models::image_open_tag_text(); let label = codex_protocol::models::image_open_tag_text();
let user_text = "Please review this image.".to_string(); let user_text = "Please review this image.".to_string();
let item = ResponseItem::Message { let item = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ content: vec![
@@ -273,7 +275,7 @@ mod tests {
], ],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
let turn_item = parse_turn_item(&item).expect("expected user message turn item"); let turn_item = parse_turn_item(&item).expect("expected user message turn item");
@@ -295,7 +297,7 @@ mod tests {
#[test] #[test]
fn skips_user_instructions_and_env() { fn skips_user_instructions_and_env() {
let items = vec![ let items = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -303,8 +305,8 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -312,8 +314,8 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -321,8 +323,8 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -331,8 +333,8 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -340,7 +342,7 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
for item in items { for item in items {
@@ -351,7 +353,7 @@ mod tests {
#[test] #[test]
fn parses_agent_message() { fn parses_agent_message() {
let item = ResponseItem::Message { let item = ResponseItem::Message(codex_protocol::models::Message {
id: Some("msg-1".to_string()), id: Some("msg-1".to_string()),
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -359,7 +361,7 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
let turn_item = parse_turn_item(&item).expect("expected agent message turn item"); let turn_item = parse_turn_item(&item).expect("expected agent message turn item");
@@ -376,7 +378,7 @@ mod tests {
#[test] #[test]
fn parses_reasoning_summary_and_raw_content() { fn parses_reasoning_summary_and_raw_content() {
let item = ResponseItem::Reasoning { let item = ResponseItem::Reasoning(codex_protocol::models::Reasoning {
id: "reasoning_1".to_string(), id: "reasoning_1".to_string(),
summary: vec![ summary: vec![
ReasoningItemReasoningSummary::SummaryText { ReasoningItemReasoningSummary::SummaryText {
@@ -390,7 +392,7 @@ mod tests {
text: "raw details".to_string(), text: "raw details".to_string(),
}]), }]),
encrypted_content: None, encrypted_content: None,
}; });
let turn_item = parse_turn_item(&item).expect("expected reasoning turn item"); let turn_item = parse_turn_item(&item).expect("expected reasoning turn item");
@@ -408,7 +410,7 @@ mod tests {
#[test] #[test]
fn parses_reasoning_including_raw_content() { fn parses_reasoning_including_raw_content() {
let item = ResponseItem::Reasoning { let item = ResponseItem::Reasoning(codex_protocol::models::Reasoning {
id: "reasoning_2".to_string(), id: "reasoning_2".to_string(),
summary: vec![ReasoningItemReasoningSummary::SummaryText { summary: vec![ReasoningItemReasoningSummary::SummaryText {
text: "Summarized step".to_string(), text: "Summarized step".to_string(),
@@ -422,7 +424,7 @@ mod tests {
}, },
]), ]),
encrypted_content: None, encrypted_content: None,
}; });
let turn_item = parse_turn_item(&item).expect("expected reasoning turn item"); let turn_item = parse_turn_item(&item).expect("expected reasoning turn item");
@@ -440,14 +442,14 @@ mod tests {
#[test] #[test]
fn parses_web_search_call() { fn parses_web_search_call() {
let item = ResponseItem::WebSearchCall { let item = ResponseItem::WebSearchCall(codex_protocol::models::WebSearchCall {
id: Some("ws_1".to_string()), id: Some("ws_1".to_string()),
status: Some("completed".to_string()), status: Some("completed".to_string()),
action: Some(WebSearchAction::Search { action: Some(WebSearchAction::Search {
query: Some("weather".to_string()), query: Some("weather".to_string()),
queries: None, queries: None,
}), }),
}; });
let turn_item = parse_turn_item(&item).expect("expected web search turn item"); let turn_item = parse_turn_item(&item).expect("expected web search turn item");
@@ -469,13 +471,13 @@ mod tests {
#[test] #[test]
fn parses_web_search_open_page_call() { fn parses_web_search_open_page_call() {
let item = ResponseItem::WebSearchCall { let item = ResponseItem::WebSearchCall(codex_protocol::models::WebSearchCall {
id: Some("ws_open".to_string()), id: Some("ws_open".to_string()),
status: Some("completed".to_string()), status: Some("completed".to_string()),
action: Some(WebSearchAction::OpenPage { action: Some(WebSearchAction::OpenPage {
url: Some("https://example.com".to_string()), url: Some("https://example.com".to_string()),
}), }),
}; });
let turn_item = parse_turn_item(&item).expect("expected web search turn item"); let turn_item = parse_turn_item(&item).expect("expected web search turn item");
@@ -496,14 +498,14 @@ mod tests {
#[test] #[test]
fn parses_web_search_find_in_page_call() { fn parses_web_search_find_in_page_call() {
let item = ResponseItem::WebSearchCall { let item = ResponseItem::WebSearchCall(codex_protocol::models::WebSearchCall {
id: Some("ws_find".to_string()), id: Some("ws_find".to_string()),
status: Some("completed".to_string()), status: Some("completed".to_string()),
action: Some(WebSearchAction::FindInPage { action: Some(WebSearchAction::FindInPage {
url: Some("https://example.com".to_string()), url: Some("https://example.com".to_string()),
pattern: Some("needle".to_string()), pattern: Some("needle".to_string()),
}), }),
}; });
let turn_item = parse_turn_item(&item).expect("expected web search turn item"); let turn_item = parse_turn_item(&item).expect("expected web search turn item");
@@ -525,11 +527,11 @@ mod tests {
#[test] #[test]
fn parses_partial_web_search_call_without_action_as_other() { fn parses_partial_web_search_call_without_action_as_other() {
let item = ResponseItem::WebSearchCall { let item = ResponseItem::WebSearchCall(codex_protocol::models::WebSearchCall {
id: Some("ws_partial".to_string()), id: Some("ws_partial".to_string()),
status: Some("in_progress".to_string()), status: Some("in_progress".to_string()),
action: None, action: None,
}; });
let turn_item = parse_turn_item(&item).expect("expected web search turn item"); let turn_item = parse_turn_item(&item).expect("expected web search turn item");
match turn_item { match turn_item {

View File

@@ -28,7 +28,7 @@ impl UserInstructions {
impl From<UserInstructions> for ResponseItem { impl From<UserInstructions> for ResponseItem {
fn from(ui: UserInstructions) -> Self { fn from(ui: UserInstructions) -> Self {
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -40,7 +40,7 @@ impl From<UserInstructions> for ResponseItem {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
} })
} }
} }
@@ -64,7 +64,7 @@ impl SkillInstructions {
impl From<SkillInstructions> for ResponseItem { impl From<SkillInstructions> for ResponseItem {
fn from(si: SkillInstructions) -> Self { fn from(si: SkillInstructions) -> Self {
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -75,7 +75,7 @@ impl From<SkillInstructions> for ResponseItem {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
} })
} }
} }
@@ -92,7 +92,9 @@ mod tests {
}; };
let response_item: ResponseItem = user_instructions.into(); let response_item: ResponseItem = user_instructions.into();
let ResponseItem::Message { role, content, .. } = response_item else { let ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) =
response_item
else {
panic!("expected ResponseItem::Message"); panic!("expected ResponseItem::Message");
}; };
@@ -136,7 +138,9 @@ mod tests {
}; };
let response_item: ResponseItem = skill_instructions.into(); let response_item: ResponseItem = skill_instructions.into();
let ResponseItem::Message { role, content, .. } = response_item else { let ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) =
response_item
else {
panic!("expected ResponseItem::Message"); panic!("expected ResponseItem::Message");
}; };

View File

@@ -1141,15 +1141,17 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
for idx in 0..total_messages { for idx in 0..total_messages {
let response_line = RolloutLine { let response_line = RolloutLine {
timestamp: format!("{ts}-{idx:02}"), timestamp: format!("{ts}-{idx:02}"),
item: RolloutItem::ResponseItem(ResponseItem::Message { item: RolloutItem::ResponseItem(ResponseItem::Message(
id: None, codex_protocol::models::Message {
role: "assistant".into(), id: None,
content: vec![ContentItem::OutputText { role: "assistant".into(),
text: format!("reply-{idx}"), content: vec![ContentItem::OutputText {
}], text: format!("reply-{idx}"),
end_turn: None, }],
phase: None, end_turn: None,
}), phase: None,
},
)),
}; };
writeln!(file, "{}", serde_json::to_string(&response_line)?)?; writeln!(file, "{}", serde_json::to_string(&response_line)?)?;
} }

View File

@@ -11,7 +11,7 @@ use codex_protocol::protocol::RolloutItem;
/// Return the indices of user message boundaries in a rollout. /// Return the indices of user message boundaries in a rollout.
/// ///
/// A user message boundary is a `RolloutItem::ResponseItem(ResponseItem::Message { .. })` /// A user message boundary is a `RolloutItem::ResponseItem(ResponseItem::Message(codex_protocol::models::Message { .. }))`
/// whose parsed turn item is `TurnItem::UserMessage`. /// whose parsed turn item is `TurnItem::UserMessage`.
/// ///
/// Rollouts can contain `ThreadRolledBack` markers. Those markers indicate that the /// Rollouts can contain `ThreadRolledBack` markers. Those markers indicate that the
@@ -21,11 +21,12 @@ pub(crate) fn user_message_positions_in_rollout(items: &[RolloutItem]) -> Vec<us
let mut user_positions = Vec::new(); let mut user_positions = Vec::new();
for (idx, item) in items.iter().enumerate() { for (idx, item) in items.iter().enumerate() {
match item { match item {
RolloutItem::ResponseItem(item @ ResponseItem::Message { .. }) RolloutItem::ResponseItem(
if matches!( item @ ResponseItem::Message(codex_protocol::models::Message { .. }),
event_mapping::parse_turn_item(item), ) if matches!(
Some(TurnItem::UserMessage(_)) event_mapping::parse_turn_item(item),
) => Some(TurnItem::UserMessage(_))
) =>
{ {
user_positions.push(idx); user_positions.push(idx);
} }
@@ -79,7 +80,7 @@ mod tests {
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
fn user_msg(text: &str) -> ResponseItem { fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -87,11 +88,11 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
} })
} }
fn assistant_msg(text: &str) -> ResponseItem { fn assistant_msg(text: &str) -> ResponseItem {
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -99,7 +100,7 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
} })
} }
#[test] #[test]
@@ -110,20 +111,20 @@ mod tests {
assistant_msg("a2"), assistant_msg("a2"),
user_msg("u2"), user_msg("u2"),
assistant_msg("a3"), assistant_msg("a3"),
ResponseItem::Reasoning { ResponseItem::Reasoning(codex_protocol::models::Reasoning {
id: "r1".to_string(), id: "r1".to_string(),
summary: vec![ReasoningItemReasoningSummary::SummaryText { summary: vec![ReasoningItemReasoningSummary::SummaryText {
text: "s".to_string(), text: "s".to_string(),
}], }],
content: None, content: None,
encrypted_content: None, encrypted_content: None,
}, }),
ResponseItem::FunctionCall { ResponseItem::FunctionCall(codex_protocol::models::FunctionCall {
id: None, id: None,
name: "tool".to_string(), name: "tool".to_string(),
arguments: "{}".to_string(), arguments: "{}".to_string(),
call_id: "c1".to_string(), call_id: "c1".to_string(),
}, }),
assistant_msg("a4"), assistant_msg("a4"),
]; ];

View File

@@ -3,6 +3,8 @@
use codex_protocol::models::ResponseItem; use codex_protocol::models::ResponseItem;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use crate::codex::SessionConfiguration; use crate::codex::SessionConfiguration;
use crate::context_manager::ContextManager; use crate::context_manager::ContextManager;
@@ -30,8 +32,17 @@ pub(crate) struct SessionState {
impl SessionState { impl SessionState {
/// Create a new session state mirroring previous `State::default()` semantics. /// Create a new session state mirroring previous `State::default()` semantics.
pub(crate) fn new(session_configuration: SessionConfiguration) -> Self { pub(crate) fn new(
let history = ContextManager::new(); session_configuration: SessionConfiguration,
thread_id: String,
codex_home: PathBuf,
model_context_window: Option<i64>,
) -> Self {
let history = ContextManager::new(
Arc::new(thread_id),
Arc::new(codex_home),
model_context_window,
);
Self { Self {
session_configuration, session_configuration,
history, history,

View File

@@ -165,9 +165,9 @@ pub(crate) async fn handle_non_tool_response_item(
debug!(?item, "Output item"); debug!(?item, "Output item");
match item { match item {
ResponseItem::Message { .. } ResponseItem::Message(codex_protocol::models::Message { .. })
| ResponseItem::Reasoning { .. } | ResponseItem::Reasoning(codex_protocol::models::Reasoning { .. })
| ResponseItem::WebSearchCall { .. } => { | ResponseItem::WebSearchCall(codex_protocol::models::WebSearchCall { .. }) => {
let mut turn_item = parse_turn_item(item)?; let mut turn_item = parse_turn_item(item)?;
if plan_mode && let TurnItem::AgentMessage(agent_message) = &mut turn_item { if plan_mode && let TurnItem::AgentMessage(agent_message) = &mut turn_item {
let combined = agent_message let combined = agent_message
@@ -183,7 +183,10 @@ pub(crate) async fn handle_non_tool_response_item(
} }
Some(turn_item) Some(turn_item)
} }
ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => { ResponseItem::FunctionCallOutput(codex_protocol::models::FunctionCallOutput { .. })
| ResponseItem::CustomToolCallOutput(codex_protocol::models::CustomToolCallOutput {
..
}) => {
debug!("unexpected tool output from stream"); debug!("unexpected tool output from stream");
None None
} }
@@ -195,7 +198,7 @@ pub(crate) fn last_assistant_message_from_item(
item: &ResponseItem, item: &ResponseItem,
plan_mode: bool, plan_mode: bool,
) -> Option<String> { ) -> Option<String> {
if let ResponseItem::Message { role, content, .. } = item if let ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) = item
&& role == "assistant" && role == "assistant"
{ {
let combined = content let combined = content
@@ -220,18 +223,18 @@ pub(crate) fn last_assistant_message_from_item(
pub(crate) fn response_input_to_response_item(input: &ResponseInputItem) -> Option<ResponseItem> { pub(crate) fn response_input_to_response_item(input: &ResponseInputItem) -> Option<ResponseItem> {
match input { match input {
ResponseInputItem::FunctionCallOutput { call_id, output } => { ResponseInputItem::FunctionCallOutput { call_id, output } => Some(
Some(ResponseItem::FunctionCallOutput { ResponseItem::FunctionCallOutput(codex_protocol::models::FunctionCallOutput {
call_id: call_id.clone(), call_id: call_id.clone(),
output: output.clone(), output: output.clone(),
}) }),
} ),
ResponseInputItem::CustomToolCallOutput { call_id, output } => { ResponseInputItem::CustomToolCallOutput { call_id, output } => Some(
Some(ResponseItem::CustomToolCallOutput { ResponseItem::CustomToolCallOutput(codex_protocol::models::CustomToolCallOutput {
call_id: call_id.clone(), call_id: call_id.clone(),
output: output.clone(), output: output.clone(),
}) }),
} ),
ResponseInputItem::McpToolCallOutput { call_id, result } => { ResponseInputItem::McpToolCallOutput { call_id, result } => {
let output = match result { let output = match result {
Ok(call_tool_result) => FunctionCallOutputPayload::from(call_tool_result), Ok(call_tool_result) => FunctionCallOutputPayload::from(call_tool_result),
@@ -240,10 +243,12 @@ pub(crate) fn response_input_to_response_item(input: &ResponseInputItem) -> Opti
success: Some(false), success: Some(false),
}, },
}; };
Some(ResponseItem::FunctionCallOutput { Some(ResponseItem::FunctionCallOutput(
call_id: call_id.clone(), codex_protocol::models::FunctionCallOutput {
output, call_id: call_id.clone(),
}) output,
},
))
} }
_ => None, _ => None,
} }

View File

@@ -106,9 +106,9 @@ impl SessionTask for GhostSnapshotTask {
} }
session session
.session .session
.record_conversation_items(&ctx, &[ResponseItem::GhostSnapshot { .record_conversation_items(&ctx, &[ResponseItem::GhostSnapshot(codex_protocol::models::GhostSnapshot {
ghost_commit: ghost_commit.clone(), ghost_commit: ghost_commit.clone(),
}]) })])
.await; .await;
info!("ghost commit captured: {}", ghost_commit.id()); info!("ghost commit captured: {}", ghost_commit.id());
} }

View File

@@ -270,7 +270,7 @@ impl Session {
.await; .await;
if reason == TurnAbortReason::Interrupted { if reason == TurnAbortReason::Interrupted {
let marker = ResponseItem::Message { let marker = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -280,7 +280,7 @@ impl Session {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref()) self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())
.await; .await;
self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)]) self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)])

View File

@@ -231,13 +231,13 @@ pub(crate) async fn exit_review_mode(
session session
.record_conversation_items( .record_conversation_items(
&ctx, &ctx,
&[ResponseItem::Message { &[ResponseItem::Message(codex_protocol::models::Message {
id: Some(REVIEW_USER_MESSAGE_ID.to_string()), id: Some(REVIEW_USER_MESSAGE_ID.to_string()),
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { text: user_message }], content: vec![ContentItem::InputText { text: user_message }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}], })],
) )
.await; .await;
@@ -250,7 +250,7 @@ pub(crate) async fn exit_review_mode(
session session
.record_response_item_and_emit_turn_item( .record_response_item_and_emit_turn_item(
ctx.as_ref(), ctx.as_ref(),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: Some(REVIEW_ASSISTANT_MESSAGE_ID.to_string()), id: Some(REVIEW_ASSISTANT_MESSAGE_ID.to_string()),
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -258,7 +258,7 @@ pub(crate) async fn exit_review_mode(
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
) )
.await; .await;

View File

@@ -77,9 +77,9 @@ impl SessionTask for UndoTask {
.enumerate() .enumerate()
.rev() .rev()
.find_map(|(idx, item)| match item { .find_map(|(idx, item)| match item {
ResponseItem::GhostSnapshot { ghost_commit } => { ResponseItem::GhostSnapshot(codex_protocol::models::GhostSnapshot {
Some((idx, ghost_commit.clone())) ghost_commit,
} }) => Some((idx, ghost_commit.clone())),
_ => None, _ => None,
}) })
else { else {

View File

@@ -298,7 +298,9 @@ async fn persist_user_shell_output(
} }
let response_input_item = match output_item { let response_input_item = match output_item {
ResponseItem::Message { role, content, .. } => ResponseInputItem::Message { role, content }, ResponseItem::Message(codex_protocol::models::Message { role, content, .. }) => {
ResponseInputItem::Message { role, content }
}
_ => unreachable!("user shell command output record should always be a message"), _ => unreachable!("user shell command output record should always be a message"),
}; };

View File

@@ -530,7 +530,7 @@ mod tests {
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
fn user_msg(text: &str) -> ResponseItem { fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -538,10 +538,10 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
} })
} }
fn assistant_msg(text: &str) -> ResponseItem { fn assistant_msg(text: &str) -> ResponseItem {
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -549,7 +549,7 @@ mod tests {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
} })
} }
#[test] #[test]
@@ -560,20 +560,20 @@ mod tests {
assistant_msg("a2"), assistant_msg("a2"),
user_msg("u2"), user_msg("u2"),
assistant_msg("a3"), assistant_msg("a3"),
ResponseItem::Reasoning { ResponseItem::Reasoning(codex_protocol::models::Reasoning {
id: "r1".to_string(), id: "r1".to_string(),
summary: vec![ReasoningItemReasoningSummary::SummaryText { summary: vec![ReasoningItemReasoningSummary::SummaryText {
text: "s".to_string(), text: "s".to_string(),
}], }],
content: None, content: None,
encrypted_content: None, encrypted_content: None,
}, }),
ResponseItem::FunctionCall { ResponseItem::FunctionCall(codex_protocol::models::FunctionCall {
id: None, id: None,
call_id: "c1".to_string(), call_id: "c1".to_string(),
name: "tool".to_string(), name: "tool".to_string(),
arguments: "{}".to_string(), arguments: "{}".to_string(),
}, }),
assistant_msg("a4"), assistant_msg("a4"),
]; ];

View File

@@ -1150,15 +1150,17 @@ mod tests {
let thread = manager let thread = manager
.resume_thread_with_history( .resume_thread_with_history(
config, config,
InitialHistory::Forked(vec![RolloutItem::ResponseItem(ResponseItem::Message { InitialHistory::Forked(vec![RolloutItem::ResponseItem(ResponseItem::Message(
id: None, codex_protocol::models::Message {
role: "user".to_string(), id: None,
content: vec![ContentItem::InputText { role: "user".to_string(),
text: "materialized".to_string(), content: vec![ContentItem::InputText {
}], text: "materialized".to_string(),
end_turn: None, }],
phase: None, end_turn: None,
})]), phase: None,
},
))]),
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")), AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")),
) )
.await .await

View File

@@ -65,12 +65,12 @@ impl ToolRouter {
item: ResponseItem, item: ResponseItem,
) -> Result<Option<ToolCall>, FunctionCallError> { ) -> Result<Option<ToolCall>, FunctionCallError> {
match item { match item {
ResponseItem::FunctionCall { ResponseItem::FunctionCall(codex_protocol::models::FunctionCall {
name, name,
arguments, arguments,
call_id, call_id,
.. ..
} => { }) => {
if let Some((server, tool)) = session.parse_mcp_tool_name(&name).await { if let Some((server, tool)) = session.parse_mcp_tool_name(&name).await {
Ok(Some(ToolCall { Ok(Some(ToolCall {
tool_name: name, tool_name: name,
@@ -89,22 +89,22 @@ impl ToolRouter {
})) }))
} }
} }
ResponseItem::CustomToolCall { ResponseItem::CustomToolCall(codex_protocol::models::CustomToolCall {
name, name,
input, input,
call_id, call_id,
.. ..
} => Ok(Some(ToolCall { }) => Ok(Some(ToolCall {
tool_name: name, tool_name: name,
call_id, call_id,
payload: ToolPayload::Custom { input }, payload: ToolPayload::Custom { input },
})), })),
ResponseItem::LocalShellCall { ResponseItem::LocalShellCall(codex_protocol::models::LocalShellCall {
id, id,
call_id, call_id,
action, action,
.. ..
} => { }) => {
let call_id = call_id let call_id = call_id
.or(id) .or(id)
.ok_or(FunctionCallError::MissingLocalShellCallId)?; .ok_or(FunctionCallError::MissingLocalShellCallId)?;

View File

@@ -56,7 +56,7 @@ pub fn user_shell_command_record_item(
exec_output: &ExecToolCallOutput, exec_output: &ExecToolCallOutput,
turn_context: &TurnContext, turn_context: &TurnContext,
) -> ResponseItem { ) -> ResponseItem {
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -64,7 +64,7 @@ pub fn user_shell_command_record_item(
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
} })
} }
#[cfg(test)] #[cfg(test)]
@@ -94,7 +94,7 @@ mod tests {
}; };
let (_, turn_context) = make_session_and_context().await; let (_, turn_context) = make_session_and_context().await;
let item = user_shell_command_record_item("echo hi", &exec_output, &turn_context); let item = user_shell_command_record_item("echo hi", &exec_output, &turn_context);
let ResponseItem::Message { content, .. } = item else { let ResponseItem::Message(codex_protocol::models::Message { content, .. }) = item else {
panic!("expected message"); panic!("expected message");
}; };
let [ContentItem::InputText { text }] = content.as_slice() else { let [ContentItem::InputText { text }] = content.as_slice() else {

View File

@@ -101,7 +101,7 @@ async fn responses_stream_includes_subagent_header_on_review() {
let mut client_session = client.new_session(); let mut client_session = client.new_session();
let mut prompt = Prompt::default(); let mut prompt = Prompt::default();
prompt.input = vec![ResponseItem::Message { prompt.input = vec![ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".into(), role: "user".into(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -109,7 +109,7 @@ async fn responses_stream_includes_subagent_header_on_review() {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}]; })];
let mut stream = client_session let mut stream = client_session
.stream(&prompt, &model_info, &otel_manager, effort, summary, None) .stream(&prompt, &model_info, &otel_manager, effort, summary, None)
@@ -205,7 +205,7 @@ async fn responses_stream_includes_subagent_header_on_other() {
let mut client_session = client.new_session(); let mut client_session = client.new_session();
let mut prompt = Prompt::default(); let mut prompt = Prompt::default();
prompt.input = vec![ResponseItem::Message { prompt.input = vec![ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".into(), role: "user".into(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -213,7 +213,7 @@ async fn responses_stream_includes_subagent_header_on_other() {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}]; })];
let mut stream = client_session let mut stream = client_session
.stream(&prompt, &model_info, &otel_manager, effort, summary, None) .stream(&prompt, &model_info, &otel_manager, effort, summary, None)
@@ -308,7 +308,7 @@ async fn responses_respects_model_info_overrides_from_config() {
let mut client_session = client.new_session(); let mut client_session = client.new_session();
let mut prompt = Prompt::default(); let mut prompt = Prompt::default();
prompt.input = vec![ResponseItem::Message { prompt.input = vec![ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".into(), role: "user".into(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -316,7 +316,7 @@ async fn responses_respects_model_info_overrides_from_config() {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}]; })];
let mut stream = client_session let mut stream = client_session
.stream(&prompt, &model_info, &otel_manager, effort, summary, None) .stream(&prompt, &model_info, &otel_manager, effort, summary, None)

View File

@@ -187,15 +187,16 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
.unwrap(); .unwrap();
// Prior item: user message (should be delivered) // Prior item: user message (should be delivered)
let prior_user = codex_protocol::models::ResponseItem::Message { let prior_user =
id: None, codex_protocol::models::ResponseItem::Message(codex_protocol::models::Message {
role: "user".to_string(), id: None,
content: vec![codex_protocol::models::ContentItem::InputText { role: "user".to_string(),
text: "resumed user message".to_string(), content: vec![codex_protocol::models::ContentItem::InputText {
}], text: "resumed user message".to_string(),
end_turn: None, }],
phase: None, end_turn: None,
}; phase: None,
});
let prior_user_json = serde_json::to_value(&prior_user).unwrap(); let prior_user_json = serde_json::to_value(&prior_user).unwrap();
writeln!( writeln!(
f, f,
@@ -209,15 +210,16 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
.unwrap(); .unwrap();
// Prior item: system message (excluded from API history) // Prior item: system message (excluded from API history)
let prior_system = codex_protocol::models::ResponseItem::Message { let prior_system =
id: None, codex_protocol::models::ResponseItem::Message(codex_protocol::models::Message {
role: "system".to_string(), id: None,
content: vec![codex_protocol::models::ContentItem::OutputText { role: "system".to_string(),
text: "resumed system instruction".to_string(), content: vec![codex_protocol::models::ContentItem::OutputText {
}], text: "resumed system instruction".to_string(),
end_turn: None, }],
phase: None, end_turn: None,
}; phase: None,
});
let prior_system_json = serde_json::to_value(&prior_system).unwrap(); let prior_system_json = serde_json::to_value(&prior_system).unwrap();
writeln!( writeln!(
f, f,
@@ -231,15 +233,16 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
.unwrap(); .unwrap();
// Prior item: assistant message // Prior item: assistant message
let prior_item = codex_protocol::models::ResponseItem::Message { let prior_item =
id: None, codex_protocol::models::ResponseItem::Message(codex_protocol::models::Message {
role: "assistant".to_string(), id: None,
content: vec![codex_protocol::models::ContentItem::OutputText { role: "assistant".to_string(),
text: "resumed assistant message".to_string(), content: vec![codex_protocol::models::ContentItem::OutputText {
}], text: "resumed assistant message".to_string(),
end_turn: None, }],
phase: Some(MessagePhase::Commentary), end_turn: None,
}; phase: Some(MessagePhase::Commentary),
});
let prior_item_json = serde_json::to_value(&prior_item).unwrap(); let prior_item_json = serde_json::to_value(&prior_item).unwrap();
writeln!( writeln!(
f, f,
@@ -1280,66 +1283,82 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
let mut client_session = client.new_session(); let mut client_session = client.new_session();
let mut prompt = Prompt::default(); let mut prompt = Prompt::default();
prompt.input.push(ResponseItem::Reasoning { prompt
id: "reasoning-id".into(), .input
summary: vec![ReasoningItemReasoningSummary::SummaryText { .push(ResponseItem::Reasoning(codex_protocol::models::Reasoning {
text: "summary".into(), id: "reasoning-id".into(),
}], summary: vec![ReasoningItemReasoningSummary::SummaryText {
content: Some(vec![ReasoningItemContent::ReasoningText { text: "summary".into(),
text: "content".into(), }],
}]), content: Some(vec![ReasoningItemContent::ReasoningText {
encrypted_content: None, text: "content".into(),
}); }]),
prompt.input.push(ResponseItem::Message { encrypted_content: None,
id: Some("message-id".into()), }));
role: "assistant".into(), prompt
content: vec![ContentItem::OutputText { .input
text: "message".into(), .push(ResponseItem::Message(codex_protocol::models::Message {
}], id: Some("message-id".into()),
end_turn: None, role: "assistant".into(),
phase: None, content: vec![ContentItem::OutputText {
}); text: "message".into(),
prompt.input.push(ResponseItem::WebSearchCall { }],
id: Some("web-search-id".into()), end_turn: None,
status: Some("completed".into()), phase: None,
action: Some(WebSearchAction::Search { }));
query: Some("weather".into()), prompt.input.push(ResponseItem::WebSearchCall(
queries: None, codex_protocol::models::WebSearchCall {
}), id: Some("web-search-id".into()),
}); status: Some("completed".into()),
prompt.input.push(ResponseItem::FunctionCall { action: Some(WebSearchAction::Search {
id: Some("function-id".into()), query: Some("weather".into()),
name: "do_thing".into(), queries: None,
arguments: "{}".into(), }),
call_id: "function-call-id".into(), },
}); ));
prompt.input.push(ResponseItem::FunctionCallOutput { prompt.input.push(ResponseItem::FunctionCall(
call_id: "function-call-id".into(), codex_protocol::models::FunctionCall {
output: FunctionCallOutputPayload::from_text("ok".into()), id: Some("function-id".into()),
}); name: "do_thing".into(),
prompt.input.push(ResponseItem::LocalShellCall { arguments: "{}".into(),
id: Some("local-shell-id".into()), call_id: "function-call-id".into(),
call_id: Some("local-shell-call-id".into()), },
status: LocalShellStatus::Completed, ));
action: LocalShellAction::Exec(LocalShellExecAction { prompt.input.push(ResponseItem::FunctionCallOutput(
command: vec!["echo".into(), "hello".into()], codex_protocol::models::FunctionCallOutput {
timeout_ms: None, call_id: "function-call-id".into(),
working_directory: None, output: FunctionCallOutputPayload::from_text("ok".into()),
env: None, },
user: None, ));
}), prompt.input.push(ResponseItem::LocalShellCall(
}); codex_protocol::models::LocalShellCall {
prompt.input.push(ResponseItem::CustomToolCall { id: Some("local-shell-id".into()),
id: Some("custom-tool-id".into()), call_id: Some("local-shell-call-id".into()),
status: Some("completed".into()), status: LocalShellStatus::Completed,
call_id: "custom-tool-call-id".into(), action: LocalShellAction::Exec(LocalShellExecAction {
name: "custom_tool".into(), command: vec!["echo".into(), "hello".into()],
input: "{}".into(), timeout_ms: None,
}); working_directory: None,
prompt.input.push(ResponseItem::CustomToolCallOutput { env: None,
call_id: "custom-tool-call-id".into(), user: None,
output: "ok".into(), }),
}); },
));
prompt.input.push(ResponseItem::CustomToolCall(
codex_protocol::models::CustomToolCall {
id: Some("custom-tool-id".into()),
status: Some("completed".into()),
call_id: "custom-tool-call-id".into(),
name: "custom_tool".into(),
input: "{}".into(),
},
));
prompt.input.push(ResponseItem::CustomToolCallOutput(
codex_protocol::models::CustomToolCallOutput {
call_id: "custom-tool-call-id".into(),
output: "ok".into(),
},
));
let mut stream = client_session let mut stream = client_session
.stream(&prompt, &model_info, &otel_manager, effort, summary, None) .stream(&prompt, &model_info, &otel_manager, effort, summary, None)

View File

@@ -622,13 +622,13 @@ async fn responses_websocket_v2_sets_openai_beta_header() {
} }
fn message_item(text: &str) -> ResponseItem { fn message_item(text: &str) -> ResponseItem {
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".into(), role: "user".into(),
content: vec![ContentItem::InputText { text: text.into() }], content: vec![ContentItem::InputText { text: text.into() }],
end_turn: None, end_turn: None,
phase: None, phase: None,
} })
} }
fn prompt_with_input(input: Vec<ResponseItem>) -> Prompt { fn prompt_with_input(input: Vec<ResponseItem>) -> Prompt {

View File

@@ -1444,7 +1444,7 @@ async fn auto_compact_runs_after_resume_when_token_usage_is_over_limit() {
let remote_summary = "REMOTE_COMPACT_SUMMARY"; let remote_summary = "REMOTE_COMPACT_SUMMARY";
let compacted_history = vec![ let compacted_history = vec![
codex_protocol::models::ResponseItem::Message { codex_protocol::models::ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![codex_protocol::models::ContentItem::OutputText { content: vec![codex_protocol::models::ContentItem::OutputText {
@@ -1452,10 +1452,10 @@ async fn auto_compact_runs_after_resume_when_token_usage_is_over_limit() {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
codex_protocol::models::ResponseItem::Compaction { codex_protocol::models::ResponseItem::Compaction(codex_protocol::models::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
}, }),
]; ];
let compact_mock = let compact_mock =
mount_compact_json_once(&server, serde_json::json!({ "output": compacted_history })).await; mount_compact_json_once(&server, serde_json::json!({ "output": compacted_history })).await;
@@ -2266,7 +2266,7 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
.await; .await;
let compacted_history = vec![ let compacted_history = vec![
codex_protocol::models::ResponseItem::Message { codex_protocol::models::ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![codex_protocol::models::ContentItem::OutputText { content: vec![codex_protocol::models::ContentItem::OutputText {
@@ -2274,10 +2274,10 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
codex_protocol::models::ResponseItem::Compaction { codex_protocol::models::ResponseItem::Compaction(codex_protocol::models::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
}, }),
]; ];
let compact_mock = let compact_mock =
mount_compact_json_once(&server, serde_json::json!({ "output": compacted_history })).await; mount_compact_json_once(&server, serde_json::json!({ "output": compacted_history })).await;
@@ -2386,7 +2386,7 @@ async fn auto_compact_runs_when_reasoning_header_clears_between_turns() {
mount_response_sequence(&server, responses).await; mount_response_sequence(&server, responses).await;
let compacted_history = vec![ let compacted_history = vec![
codex_protocol::models::ResponseItem::Message { codex_protocol::models::ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![codex_protocol::models::ContentItem::OutputText { content: vec![codex_protocol::models::ContentItem::OutputText {
@@ -2394,10 +2394,10 @@ async fn auto_compact_runs_when_reasoning_header_clears_between_turns() {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
codex_protocol::models::ResponseItem::Compaction { codex_protocol::models::ResponseItem::Compaction(codex_protocol::models::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
}, }),
]; ];
let compact_mock = let compact_mock =
mount_compact_json_once(&server, serde_json::json!({ "output": compacted_history })).await; mount_compact_json_once(&server, serde_json::json!({ "output": compacted_history })).await;

View File

@@ -65,7 +65,7 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
.await; .await;
let compacted_history = vec![ let compacted_history = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -73,10 +73,10 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Compaction { ResponseItem::Compaction(codex_protocol::models::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
}, }),
]; ];
let compact_mock = responses::mount_compact_json_once( let compact_mock = responses::mount_compact_json_once(
harness.server(), harness.server(),
@@ -184,7 +184,7 @@ async fn remote_compact_runs_automatically() -> Result<()> {
.await; .await;
let compacted_history = vec![ let compacted_history = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -192,10 +192,10 @@ async fn remote_compact_runs_automatically() -> Result<()> {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Compaction { ResponseItem::Compaction(codex_protocol::models::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
}, }),
]; ];
let compact_mock = responses::mount_compact_json_once( let compact_mock = responses::mount_compact_json_once(
harness.server(), harness.server(),
@@ -783,7 +783,7 @@ async fn remote_manual_compact_emits_context_compaction_items() -> Result<()> {
.await; .await;
let compacted_history = vec![ let compacted_history = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -791,10 +791,10 @@ async fn remote_manual_compact_emits_context_compaction_items() -> Result<()> {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Compaction { ResponseItem::Compaction(codex_protocol::models::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
}, }),
]; ];
let compact_mock = responses::mount_compact_json_once( let compact_mock = responses::mount_compact_json_once(
harness.server(), harness.server(),
@@ -881,7 +881,7 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()>
.await; .await;
let compacted_history = vec![ let compacted_history = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -889,11 +889,11 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()>
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Compaction { ResponseItem::Compaction(codex_protocol::models::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -901,7 +901,7 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()>
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
]; ];
let compact_mock = responses::mount_compact_json_once( let compact_mock = responses::mount_compact_json_once(
harness.server(), harness.server(),
@@ -946,7 +946,7 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()>
let has_compacted_user_summary = replacement_history.iter().any(|item| { let has_compacted_user_summary = replacement_history.iter().any(|item| {
matches!( matches!(
item, item,
ResponseItem::Message { role, content, .. } ResponseItem::Message(codex_protocol::models::Message { role, content, .. })
if role == "user" if role == "user"
&& content.iter().any(|part| matches!( && content.iter().any(|part| matches!(
part, part,
@@ -957,14 +957,14 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()>
let has_compaction_item = replacement_history.iter().any(|item| { let has_compaction_item = replacement_history.iter().any(|item| {
matches!( matches!(
item, item,
ResponseItem::Compaction { encrypted_content } ResponseItem::Compaction(codex_protocol::models::Compaction { encrypted_content })
if encrypted_content == "ENCRYPTED_COMPACTION_SUMMARY" if encrypted_content == "ENCRYPTED_COMPACTION_SUMMARY"
) )
}); });
let has_compacted_assistant_note = replacement_history.iter().any(|item| { let has_compacted_assistant_note = replacement_history.iter().any(|item| {
matches!( matches!(
item, item,
ResponseItem::Message { role, content, .. } ResponseItem::Message(codex_protocol::models::Message { role, content, .. })
if role == "assistant" if role == "assistant"
&& content.iter().any(|part| matches!( && content.iter().any(|part| matches!(
part, part,
@@ -975,7 +975,7 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()>
let has_permissions_developer_message = replacement_history.iter().any(|item| { let has_permissions_developer_message = replacement_history.iter().any(|item| {
matches!( matches!(
item, item,
ResponseItem::Message { role, content, .. } ResponseItem::Message(codex_protocol::models::Message { role, content, .. })
if role == "developer" if role == "developer"
&& content.iter().any(|part| matches!( && content.iter().any(|part| matches!(
part, part,
@@ -1041,7 +1041,7 @@ async fn remote_compact_and_resume_refresh_stale_developer_instructions() -> Res
.await; .await;
let compacted_history = vec![ let compacted_history = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -1049,8 +1049,8 @@ async fn remote_compact_and_resume_refresh_stale_developer_instructions() -> Res
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -1058,10 +1058,10 @@ async fn remote_compact_and_resume_refresh_stale_developer_instructions() -> Res
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Compaction { ResponseItem::Compaction(codex_protocol::models::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
}, }),
]; ];
let compact_mock = responses::mount_compact_json_once( let compact_mock = responses::mount_compact_json_once(
&server, &server,
@@ -1182,7 +1182,7 @@ async fn remote_compact_refreshes_stale_developer_instructions_without_resume()
.await; .await;
let compacted_history = vec![ let compacted_history = vec![
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -1190,8 +1190,8 @@ async fn remote_compact_refreshes_stale_developer_instructions_without_resume()
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Message { ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -1199,10 +1199,10 @@ async fn remote_compact_refreshes_stale_developer_instructions_without_resume()
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseItem::Compaction { ResponseItem::Compaction(codex_protocol::models::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
}, }),
]; ];
let compact_mock = responses::mount_compact_json_once( let compact_mock = responses::mount_compact_json_once(
&server, &server,

View File

@@ -0,0 +1,73 @@
#![cfg(not(target_os = "windows"))]
use anyhow::Context;
use anyhow::Result;
use codex_core::protocol::EventMsg;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use sha2::Digest;
use sha2::Sha256;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn oversized_user_message_is_stored_as_discoverable_item() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let request_log = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let mut builder = test_codex().with_config(|config| {
config.model_context_window = Some(100);
});
let test = builder.build(&server).await?;
let large_prompt = "oversized user prompt ".repeat(800);
test.codex
.submit(codex_core::protocol::Op::UserInput {
items: vec![UserInput::Text {
text: large_prompt.clone(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let request = request_log.single_request();
let mut hasher = Sha256::new();
hasher.update(large_prompt.as_bytes());
let expected_checksum = format!("{:x}", hasher.finalize());
let discoverable_root = test.codex_home_path().join("discovarable_items");
let expected_path = std::fs::read_dir(&discoverable_root)
.context("read discoverable_items dir")?
.filter_map(|entry| entry.ok())
.map(|entry| entry.path().join("user_message").join(&expected_checksum))
.find(|candidate| candidate.is_file())
.context("expected discoverable file was not created")?;
assert!(expected_path.is_file(), "discoverable file should exist");
let expected = format!(
"User message was too large. Read it from <{}>",
expected_path.display()
);
let actual = request
.message_input_texts("user")
.into_iter()
.find(|text| text.contains(&expected_checksum))
.context("missing discoverable replacement text in user input")?;
assert_eq!(actual, expected);
Ok(())
}

View File

@@ -35,8 +35,11 @@ fn find_user_message_with_image(text: &str) -> Option<ResponseItem> {
Ok(rollout) => rollout, Ok(rollout) => rollout,
Err(_) => continue, Err(_) => continue,
}; };
if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = if let RolloutItem::ResponseItem(ResponseItem::Message(codex_protocol::models::Message {
&rollout.item role,
content,
..
})) = &rollout.item
&& role == "user" && role == "user"
&& content && content
.iter() .iter()
@@ -51,10 +54,12 @@ fn find_user_message_with_image(text: &str) -> Option<ResponseItem> {
fn extract_image_url(item: &ResponseItem) -> Option<String> { fn extract_image_url(item: &ResponseItem) -> Option<String> {
match item { match item {
ResponseItem::Message { content, .. } => content.iter().find_map(|span| match span { ResponseItem::Message(codex_protocol::models::Message { content, .. }) => {
ContentItem::InputImage { image_url } => Some(image_url.clone()), content.iter().find_map(|span| match span {
_ => None, ContentItem::InputImage { image_url } => Some(image_url.clone()),
}), _ => None,
})
}
_ => None, _ => None,
} }
} }
@@ -142,7 +147,7 @@ async fn copy_paste_local_image_persists_rollout_request_shape() -> anyhow::Resu
.expect("expected user message with input image in rollout"); .expect("expected user message with input image in rollout");
let image_url = extract_image_url(&actual).expect("expected image url in rollout"); let image_url = extract_image_url(&actual).expect("expected image url in rollout");
let expected = ResponseItem::Message { let expected = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ content: vec![
@@ -159,7 +164,7 @@ async fn copy_paste_local_image_persists_rollout_request_shape() -> anyhow::Resu
], ],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
assert_eq!(actual, expected); assert_eq!(actual, expected);
@@ -224,7 +229,7 @@ async fn drag_drop_image_persists_rollout_request_shape() -> anyhow::Result<()>
.expect("expected user message with input image in rollout"); .expect("expected user message with input image in rollout");
let image_url = extract_image_url(&actual).expect("expected image url in rollout"); let image_url = extract_image_url(&actual).expect("expected image url in rollout");
let expected = ResponseItem::Message { let expected = ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![ content: vec![
@@ -241,7 +246,7 @@ async fn drag_drop_image_persists_rollout_request_shape() -> anyhow::Result<()>
], ],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
assert_eq!(actual, expected); assert_eq!(actual, expected);

View File

@@ -70,6 +70,7 @@ mod compact;
mod compact_remote; mod compact_remote;
mod compact_resume_fork; mod compact_resume_fork;
mod deprecation_notice; mod deprecation_notice;
mod discoverable_history;
mod exec; mod exec;
mod exec_policy; mod exec_policy;
mod fork_thread; mod fork_thread;

View File

@@ -61,7 +61,7 @@ fn rollout_developer_texts(text: &str) -> Vec<String> {
Ok(rollout) => rollout, Ok(rollout) => rollout,
Err(_) => continue, Err(_) => continue,
}; };
if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = if let RolloutItem::ResponseItem(ResponseItem::Message(codex_protocol::models::Message { role, content, .. })) =
rollout.item rollout.item
&& role == "developer" && role == "developer"
{ {
@@ -86,7 +86,7 @@ fn rollout_environment_texts(text: &str) -> Vec<String> {
Ok(rollout) => rollout, Ok(rollout) => rollout,
Err(_) => continue, Err(_) => continue,
}; };
if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = if let RolloutItem::ResponseItem(ResponseItem::Message(codex_protocol::models::Message { role, content, .. })) =
rollout.item rollout.item
&& role == "user" && role == "user"
{ {

View File

@@ -130,7 +130,12 @@ async fn review_op_emits_lifecycle_and_review_output() {
} }
let v: serde_json::Value = serde_json::from_str(line).expect("jsonl line"); let v: serde_json::Value = serde_json::from_str(line).expect("jsonl line");
let rl: RolloutLine = serde_json::from_value(v).expect("rollout line"); let rl: RolloutLine = serde_json::from_value(v).expect("rollout line");
if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = rl.item { if let RolloutItem::ResponseItem(ResponseItem::Message(codex_protocol::models::Message {
role,
content,
..
})) = rl.item
{
if role == "user" { if role == "user" {
for c in content { for c in content {
if let ContentItem::InputText { text } = c { if let ContentItem::InputText { text } = c {
@@ -523,7 +528,7 @@ async fn review_input_isolated_from_parent_history() {
.unwrap(); .unwrap();
// Prior user message (enveloped response_item) // Prior user message (enveloped response_item)
let user = codex_protocol::models::ResponseItem::Message { let user = codex_protocol::models::ResponseItem::Message(codex_protocol::models::Message {
id: None, id: None,
role: "user".to_string(), role: "user".to_string(),
content: vec![codex_protocol::models::ContentItem::InputText { content: vec![codex_protocol::models::ContentItem::InputText {
@@ -531,7 +536,7 @@ async fn review_input_isolated_from_parent_history() {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
}; });
let user_json = serde_json::to_value(&user).unwrap(); let user_json = serde_json::to_value(&user).unwrap();
let user_line = serde_json::json!({ let user_line = serde_json::json!({
"timestamp": "2024-01-01T00:00:01.000Z", "timestamp": "2024-01-01T00:00:01.000Z",
@@ -543,15 +548,16 @@ async fn review_input_isolated_from_parent_history() {
.unwrap(); .unwrap();
// Prior assistant message (enveloped response_item) // Prior assistant message (enveloped response_item)
let assistant = codex_protocol::models::ResponseItem::Message { let assistant =
id: None, codex_protocol::models::ResponseItem::Message(codex_protocol::models::Message {
role: "assistant".to_string(), id: None,
content: vec![codex_protocol::models::ContentItem::OutputText { role: "assistant".to_string(),
text: "parent: assistant reply".to_string(), content: vec![codex_protocol::models::ContentItem::OutputText {
}], text: "parent: assistant reply".to_string(),
end_turn: None, }],
phase: None, end_turn: None,
}; phase: None,
});
let assistant_json = serde_json::to_value(&assistant).unwrap(); let assistant_json = serde_json::to_value(&assistant).unwrap();
let assistant_line = serde_json::json!({ let assistant_line = serde_json::json!({
"timestamp": "2024-01-01T00:00:02.000Z", "timestamp": "2024-01-01T00:00:02.000Z",
@@ -636,7 +642,11 @@ async fn review_input_isolated_from_parent_history() {
} }
let v: serde_json::Value = serde_json::from_str(line).expect("jsonl line"); let v: serde_json::Value = serde_json::from_str(line).expect("jsonl line");
let rl: RolloutLine = serde_json::from_value(v).expect("rollout line"); let rl: RolloutLine = serde_json::from_value(v).expect("rollout line");
if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = rl.item if let RolloutItem::ResponseItem(ResponseItem::Message(codex_protocol::models::Message {
role,
content,
..
})) = rl.item
&& role == "user" && role == "user"
{ {
for c in content { for c in content {

View File

@@ -102,13 +102,21 @@ impl OtelManager {
match event { match event {
ResponseEvent::OutputItemDone(item) => { ResponseEvent::OutputItemDone(item) => {
handle_responses_span.record("from", "output_item_done"); handle_responses_span.record("from", "output_item_done");
if let ResponseItem::FunctionCall { name, .. } = &item { if let ResponseItem::FunctionCall(codex_protocol::models::FunctionCall {
name,
..
}) = &item
{
handle_responses_span.record("tool_name", name.as_str()); handle_responses_span.record("tool_name", name.as_str());
} }
} }
ResponseEvent::OutputItemAdded(item) => { ResponseEvent::OutputItemAdded(item) => {
handle_responses_span.record("from", "output_item_added"); handle_responses_span.record("from", "output_item_added");
if let ResponseItem::FunctionCall { name, .. } = &item { if let ResponseItem::FunctionCall(codex_protocol::models::FunctionCall {
name,
..
}) = &item
{
handle_responses_span.record("tool_name", name.as_str()); handle_responses_span.record("tool_name", name.as_str());
} }
} }
@@ -755,16 +763,34 @@ impl OtelManager {
fn responses_item_type(item: &ResponseItem) -> String { fn responses_item_type(item: &ResponseItem) -> String {
match item { match item {
ResponseItem::Message { role, .. } => format!("message_from_{role}"), ResponseItem::Message(codex_protocol::models::Message { role, .. }) => {
ResponseItem::Reasoning { .. } => "reasoning".into(), format!("message_from_{role}")
ResponseItem::LocalShellCall { .. } => "local_shell_call".into(), }
ResponseItem::FunctionCall { .. } => "function_call".into(), ResponseItem::Reasoning(codex_protocol::models::Reasoning { .. }) => "reasoning".into(),
ResponseItem::FunctionCallOutput { .. } => "function_call_output".into(), ResponseItem::LocalShellCall(codex_protocol::models::LocalShellCall { .. }) => {
ResponseItem::CustomToolCall { .. } => "custom_tool_call".into(), "local_shell_call".into()
ResponseItem::CustomToolCallOutput { .. } => "custom_tool_call_output".into(), }
ResponseItem::WebSearchCall { .. } => "web_search_call".into(), ResponseItem::FunctionCall(codex_protocol::models::FunctionCall { .. }) => {
ResponseItem::GhostSnapshot { .. } => "ghost_snapshot".into(), "function_call".into()
ResponseItem::Compaction { .. } => "compaction".into(), }
ResponseItem::FunctionCallOutput(codex_protocol::models::FunctionCallOutput {
..
}) => "function_call_output".into(),
ResponseItem::CustomToolCall(codex_protocol::models::CustomToolCall { .. }) => {
"custom_tool_call".into()
}
ResponseItem::CustomToolCallOutput(codex_protocol::models::CustomToolCallOutput {
..
}) => "custom_tool_call_output".into(),
ResponseItem::WebSearchCall(codex_protocol::models::WebSearchCall { .. }) => {
"web_search_call".into()
}
ResponseItem::GhostSnapshot(codex_protocol::models::GhostSnapshot { .. }) => {
"ghost_snapshot".into()
}
ResponseItem::Compaction(codex_protocol::models::Compaction { .. }) => {
"compaction".into()
}
ResponseItem::Other => "other".into(), ResponseItem::Other => "other".into(),
} }
} }

View File

@@ -88,82 +88,125 @@ pub enum MessagePhase {
FinalAnswer, FinalAnswer,
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)]
pub struct Message {
#[serde(default, skip_serializing)]
#[ts(skip)]
pub id: Option<String>,
pub role: String,
pub content: Vec<ContentItem>,
// Do not use directly, no available consistently across all providers.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub end_turn: Option<bool>,
// Optional output-message phase (for example: "commentary", "final_answer").
// Availability varies by provider/model, so downstream consumers must
// preserve fallback behavior when this is absent.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub phase: Option<MessagePhase>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)]
pub struct Reasoning {
#[serde(default, skip_serializing)]
#[ts(skip)]
pub id: String,
pub summary: Vec<ReasoningItemReasoningSummary>,
#[serde(default, skip_serializing_if = "should_serialize_reasoning_content")]
#[ts(optional)]
pub content: Option<Vec<ReasoningItemContent>>,
pub encrypted_content: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)]
pub struct LocalShellCall {
/// Legacy id field retained for compatibility with older payloads.
#[serde(default, skip_serializing)]
#[ts(skip)]
pub id: Option<String>,
/// Set when using the Responses API.
pub call_id: Option<String>,
pub status: LocalShellStatus,
pub action: LocalShellAction,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)]
pub struct FunctionCall {
#[serde(default, skip_serializing)]
#[ts(skip)]
pub id: Option<String>,
// The Responses API returns the function call arguments as a *string* that contains
// JSON, not as an alreadyparsed object. We keep it as a raw string here and let
// Session::handle_function_call parse it into a Value.
pub name: String,
pub arguments: String,
pub call_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)]
pub struct FunctionCallOutput {
pub call_id: String,
pub output: FunctionCallOutputPayload,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)]
pub struct CustomToolCall {
#[serde(default, skip_serializing)]
#[ts(skip)]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub status: Option<String>,
pub call_id: String,
pub name: String,
pub input: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)]
pub struct CustomToolCallOutput {
pub call_id: String,
pub output: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)]
pub struct WebSearchCall {
#[serde(default, skip_serializing)]
#[ts(skip)]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub status: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub action: Option<WebSearchAction>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)]
pub struct GhostSnapshot {
pub ghost_commit: GhostCommit,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)]
pub struct Compaction {
pub encrypted_content: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)]
#[serde(tag = "type", rename_all = "snake_case")] #[serde(tag = "type", rename_all = "snake_case")]
pub enum ResponseItem { pub enum ResponseItem {
Message { Message(Message),
#[serde(default, skip_serializing)] Reasoning(Reasoning),
#[ts(skip)] LocalShellCall(LocalShellCall),
id: Option<String>, FunctionCall(FunctionCall),
role: String,
content: Vec<ContentItem>,
// Do not use directly, no available consistently across all providers.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
end_turn: Option<bool>,
// Optional output-message phase (for example: "commentary", "final_answer").
// Availability varies by provider/model, so downstream consumers must
// preserve fallback behavior when this is absent.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
phase: Option<MessagePhase>,
},
Reasoning {
#[serde(default, skip_serializing)]
#[ts(skip)]
id: String,
summary: Vec<ReasoningItemReasoningSummary>,
#[serde(default, skip_serializing_if = "should_serialize_reasoning_content")]
#[ts(optional)]
content: Option<Vec<ReasoningItemContent>>,
encrypted_content: Option<String>,
},
LocalShellCall {
/// Legacy id field retained for compatibility with older payloads.
#[serde(default, skip_serializing)]
#[ts(skip)]
id: Option<String>,
/// Set when using the Responses API.
call_id: Option<String>,
status: LocalShellStatus,
action: LocalShellAction,
},
FunctionCall {
#[serde(default, skip_serializing)]
#[ts(skip)]
id: Option<String>,
name: String,
// The Responses API returns the function call arguments as a *string* that contains
// JSON, not as an alreadyparsed object. We keep it as a raw string here and let
// Session::handle_function_call parse it into a Value.
arguments: String,
call_id: String,
},
// NOTE: The `output` field for `function_call_output` uses a dedicated payload type with // NOTE: The `output` field for `function_call_output` uses a dedicated payload type with
// custom serialization. On the wire it is either: // custom serialization. On the wire it is either:
// - a plain string (`content`) // - a plain string (`content`)
// - an array of structured content items (`content_items`) // - an array of structured content items (`content_items`)
// We keep this behavior centralized in `FunctionCallOutputPayload`. // We keep this behavior centralized in `FunctionCallOutputPayload`.
FunctionCallOutput { FunctionCallOutput(FunctionCallOutput),
call_id: String, CustomToolCall(CustomToolCall),
output: FunctionCallOutputPayload, CustomToolCallOutput(CustomToolCallOutput),
},
CustomToolCall {
#[serde(default, skip_serializing)]
#[ts(skip)]
id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
status: Option<String>,
call_id: String,
name: String,
input: String,
},
CustomToolCallOutput {
call_id: String,
output: String,
},
// Emitted by the Responses API when the agent triggers a web search. // Emitted by the Responses API when the agent triggers a web search.
// Example payload (from SSE `response.output_item.done`): // Example payload (from SSE `response.output_item.done`):
// { // {
@@ -172,25 +215,11 @@ pub enum ResponseItem {
// "status":"completed", // "status":"completed",
// "action": {"type":"search","query":"weather: San Francisco, CA"} // "action": {"type":"search","query":"weather: San Francisco, CA"}
// } // }
WebSearchCall { WebSearchCall(WebSearchCall),
#[serde(default, skip_serializing)]
#[ts(skip)]
id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
status: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
action: Option<WebSearchAction>,
},
// Generated by the harness but considered exactly as a model response. // Generated by the harness but considered exactly as a model response.
GhostSnapshot { GhostSnapshot(GhostSnapshot),
ghost_commit: GhostCommit,
},
#[serde(alias = "compaction_summary")] #[serde(alias = "compaction_summary")]
Compaction { Compaction(Compaction),
encrypted_content: String,
},
#[serde(other)] #[serde(other)]
Other, Other,
} }
@@ -459,7 +488,7 @@ fn render_command_prefix(prefix: &[String]) -> String {
impl From<DeveloperInstructions> for ResponseItem { impl From<DeveloperInstructions> for ResponseItem {
fn from(di: DeveloperInstructions) -> Self { fn from(di: DeveloperInstructions) -> Self {
ResponseItem::Message { ResponseItem::Message(Message {
id: None, id: None,
role: "developer".to_string(), role: "developer".to_string(),
content: vec![ContentItem::InputText { content: vec![ContentItem::InputText {
@@ -467,7 +496,7 @@ impl From<DeveloperInstructions> for ResponseItem {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
} })
} }
} }
@@ -619,15 +648,15 @@ pub fn local_image_content_items_with_label_number(
impl From<ResponseInputItem> for ResponseItem { impl From<ResponseInputItem> for ResponseItem {
fn from(item: ResponseInputItem) -> Self { fn from(item: ResponseInputItem) -> Self {
match item { match item {
ResponseInputItem::Message { role, content } => Self::Message { ResponseInputItem::Message { role, content } => Self::Message(Message {
role, role,
content, content,
id: None, id: None,
end_turn: None, end_turn: None,
phase: None, phase: None,
}, }),
ResponseInputItem::FunctionCallOutput { call_id, output } => { ResponseInputItem::FunctionCallOutput { call_id, output } => {
Self::FunctionCallOutput { call_id, output } Self::FunctionCallOutput(FunctionCallOutput { call_id, output })
} }
ResponseInputItem::McpToolCallOutput { call_id, result } => { ResponseInputItem::McpToolCallOutput { call_id, result } => {
let output = match result { let output = match result {
@@ -637,10 +666,10 @@ impl From<ResponseInputItem> for ResponseItem {
success: Some(false), success: Some(false),
}, },
}; };
Self::FunctionCallOutput { call_id, output } Self::FunctionCallOutput(FunctionCallOutput { call_id, output })
} }
ResponseInputItem::CustomToolCallOutput { call_id, output } => { ResponseInputItem::CustomToolCallOutput { call_id, output } => {
Self::CustomToolCallOutput { call_id, output } Self::CustomToolCallOutput(CustomToolCallOutput { call_id, output })
} }
} }
} }
@@ -1462,9 +1491,9 @@ mod tests {
assert_eq!( assert_eq!(
item, item,
ResponseItem::Compaction { ResponseItem::Compaction(crate::models::Compaction {
encrypted_content: "abc".into(), encrypted_content: "abc".into(),
} })
); );
Ok(()) Ok(())
} }
@@ -1540,11 +1569,11 @@ mod tests {
for (json_literal, expected_id, expected_action, expected_status, expect_roundtrip) in cases for (json_literal, expected_id, expected_action, expected_status, expect_roundtrip) in cases
{ {
let parsed: ResponseItem = serde_json::from_str(json_literal)?; let parsed: ResponseItem = serde_json::from_str(json_literal)?;
let expected = ResponseItem::WebSearchCall { let expected = ResponseItem::WebSearchCall(crate::models::WebSearchCall {
id: expected_id.clone(), id: expected_id.clone(),
status: expected_status.clone(), status: expected_status.clone(),
action: expected_action.clone(), action: expected_action.clone(),
}; });
assert_eq!(parsed, expected); assert_eq!(parsed, expected);
let serialized = serde_json::to_value(&parsed)?; let serialized = serde_json::to_value(&parsed)?;

View File

@@ -1720,7 +1720,7 @@ pub struct CompactedItem {
impl From<CompactedItem> for ResponseItem { impl From<CompactedItem> for ResponseItem {
fn from(value: CompactedItem) -> Self { fn from(value: CompactedItem) -> Self {
ResponseItem::Message { ResponseItem::Message(crate::models::Message {
id: None, id: None,
role: "assistant".to_string(), role: "assistant".to_string(),
content: vec![ContentItem::OutputText { content: vec![ContentItem::OutputText {
@@ -1728,7 +1728,7 @@ impl From<CompactedItem> for ResponseItem {
}], }],
end_turn: None, end_turn: None,
phase: None, phase: None,
} })
} }
} }

View File

@@ -137,15 +137,16 @@ mod tests {
#[test] #[test]
fn response_item_user_messages_do_not_set_title_or_first_user_message() { fn response_item_user_messages_do_not_set_title_or_first_user_message() {
let mut metadata = metadata_for_test(); let mut metadata = metadata_for_test();
let item = RolloutItem::ResponseItem(ResponseItem::Message { let item =
id: None, RolloutItem::ResponseItem(ResponseItem::Message(codex_protocol::models::Message {
role: "user".to_string(), id: None,
content: vec![ContentItem::InputText { role: "user".to_string(),
text: "hello from response item".to_string(), content: vec![ContentItem::InputText {
}], text: "hello from response item".to_string(),
end_turn: None, }],
phase: None, end_turn: None,
}); phase: None,
}));
apply_rollout_item(&mut metadata, &item, "test-provider"); apply_rollout_item(&mut metadata, &item, "test-provider");