mirror of
https://github.com/openai/codex.git
synced 2026-05-02 10:26:45 +00:00
Add backend remember-context RPC
Load source rollouts, extract visible conversation, persist a hidden remembered-context packet, and expose the v2 thread/remember RPC for UI integration. Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
committed by
Taylor McIntyre
parent
fb547f1a66
commit
f341ca973e
@@ -16,6 +16,7 @@ use crate::outgoing_message::ConnectionRequestId;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::outgoing_message::RequestContext;
|
||||
use crate::outgoing_message::ThreadScopedOutgoingMessageSender;
|
||||
use crate::thread_remember::build_remembered_context;
|
||||
use crate::thread_status::ThreadWatchManager;
|
||||
use crate::thread_status::resolve_thread_status;
|
||||
use chrono::DateTime;
|
||||
@@ -154,6 +155,8 @@ use codex_app_server_protocol::ThreadRealtimeStartResponse;
|
||||
use codex_app_server_protocol::ThreadRealtimeStartTransport;
|
||||
use codex_app_server_protocol::ThreadRealtimeStopParams;
|
||||
use codex_app_server_protocol::ThreadRealtimeStopResponse;
|
||||
use codex_app_server_protocol::ThreadRememberParams;
|
||||
use codex_app_server_protocol::ThreadRememberResponse;
|
||||
use codex_app_server_protocol::ThreadResumeParams;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadRollbackParams;
|
||||
@@ -223,6 +226,7 @@ use codex_core::find_archived_thread_path_by_id_str;
|
||||
use codex_core::find_thread_name_by_id;
|
||||
use codex_core::find_thread_names_by_ids;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_core::load_remembered_conversation_from_rollout;
|
||||
use codex_core::parse_cursor;
|
||||
use codex_core::plugins::MarketplaceError;
|
||||
use codex_core::plugins::MarketplacePluginSource;
|
||||
@@ -235,6 +239,7 @@ use codex_core::plugins::load_plugin_apps;
|
||||
use codex_core::plugins::load_plugin_mcp_servers;
|
||||
use codex_core::read_head_for_summary;
|
||||
use codex_core::read_session_meta_line;
|
||||
use codex_core::remembered_context_response_item;
|
||||
use codex_core::rollout_date_parts;
|
||||
use codex_core::sandboxing::SandboxPermissions;
|
||||
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
|
||||
@@ -883,6 +888,10 @@ impl CodexMessageProcessor {
|
||||
self.thread_read(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ThreadRemember { request_id, params } => {
|
||||
self.thread_remember(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ThreadShellCommand { request_id, params } => {
|
||||
self.thread_shell_command(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
@@ -3873,6 +3882,145 @@ impl CodexMessageProcessor {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn thread_remember(
|
||||
&mut self,
|
||||
request_id: ConnectionRequestId,
|
||||
params: ThreadRememberParams,
|
||||
) {
|
||||
let ThreadRememberParams {
|
||||
thread_id,
|
||||
source_thread_ids,
|
||||
} = params;
|
||||
|
||||
if source_thread_ids.is_empty() {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"sourceThreadIds must contain at least one thread id".to_string(),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
let (target_thread_id, target_thread) = match self.load_thread(&thread_id).await {
|
||||
Ok(loaded_thread) => loaded_thread,
|
||||
Err(err) => {
|
||||
self.outgoing.send_error(request_id, err).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
if matches!(target_thread.agent_status().await, AgentStatus::Running) {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"thread/remember requires an idle target thread".to_string(),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
let mut source_thread_ids_parsed = Vec::with_capacity(source_thread_ids.len());
|
||||
for source_thread_id in source_thread_ids {
|
||||
let source_thread_id = match ThreadId::from_string(&source_thread_id) {
|
||||
Ok(source_thread_id) => source_thread_id,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("invalid source thread id: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
if source_thread_id == target_thread_id {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"thread/remember sourceThreadIds must not contain the target thread id"
|
||||
.to_string(),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
source_thread_ids_parsed.push(source_thread_id);
|
||||
}
|
||||
|
||||
let mut sources = Vec::with_capacity(source_thread_ids_parsed.len());
|
||||
for source_thread_id in &source_thread_ids_parsed {
|
||||
let rollout_path = match self
|
||||
.find_thread_remember_source_rollout_path(*source_thread_id)
|
||||
.await
|
||||
{
|
||||
Ok(rollout_path) => rollout_path,
|
||||
Err(message) => {
|
||||
self.send_invalid_request_error(request_id, message).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let conversation = match load_remembered_conversation_from_rollout(&rollout_path).await
|
||||
{
|
||||
Ok(conversation) => conversation,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!(
|
||||
"failed to load remembered context from `{}` for thread {source_thread_id}: {err}",
|
||||
rollout_path.display(),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
sources.push((source_thread_id.to_string(), conversation));
|
||||
}
|
||||
|
||||
let built_context = build_remembered_context(&sources);
|
||||
let context_item = remembered_context_response_item(built_context.context);
|
||||
|
||||
target_thread.ensure_rollout_materialized().await;
|
||||
target_thread
|
||||
.record_response_item_without_turn(context_item)
|
||||
.await;
|
||||
target_thread.flush_rollout().await;
|
||||
|
||||
let response = ThreadRememberResponse {
|
||||
remembered_thread_ids: source_thread_ids_parsed
|
||||
.into_iter()
|
||||
.map(|thread_id| thread_id.to_string())
|
||||
.collect(),
|
||||
context_preview: built_context.preview,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn find_thread_remember_source_rollout_path(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
) -> Result<PathBuf, String> {
|
||||
if let Ok(thread) = self.thread_manager.get_thread(thread_id).await
|
||||
&& let Some(rollout_path) = thread.rollout_path()
|
||||
{
|
||||
return Ok(rollout_path);
|
||||
}
|
||||
|
||||
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await {
|
||||
Ok(Some(path)) => Ok(path),
|
||||
Ok(None) => {
|
||||
match find_archived_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_id.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(path)) => Ok(path),
|
||||
Ok(None) => Err(format!("source thread not found: {thread_id}")),
|
||||
Err(err) => Err(format!(
|
||||
"failed to locate archived source thread {thread_id}: {err}"
|
||||
)),
|
||||
}
|
||||
}
|
||||
Err(err) => Err(format!("failed to locate source thread {thread_id}: {err}")),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
|
||||
self.thread_manager.subscribe_thread_created()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user