chore[multi-agent]: multiple agents running

This commit is contained in:
jif-oai
2025-12-18 14:51:41 +00:00
parent 9c859b4e85
commit b9331372ad
9 changed files with 617 additions and 232 deletions

View File

@@ -151,6 +151,7 @@ use codex_protocol::protocol::GitInfo as CoreGitInfo;
use codex_protocol::protocol::McpAuthStatus as CoreMcpAuthStatus;
use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::user_input::UserInput as CoreInputItem;
@@ -1580,7 +1581,16 @@ impl CodexMessageProcessor {
.await;
return;
}
InitialHistory::Forked(history.into_iter().map(RolloutItem::ResponseItem).collect())
InitialHistory::Forked(
history
.into_iter()
.map(|item| RolloutLine {
timestamp: String::new(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect(),
)
} else if let Some(path) = path {
match RolloutRecorder::get_rollout_history(&path).await {
Ok(initial_history) => initial_history,
@@ -2296,7 +2306,14 @@ impl CodexMessageProcessor {
} else {
match history {
Some(history) if !history.is_empty() => InitialHistory::Forked(
history.into_iter().map(RolloutItem::ResponseItem).collect(),
history
.into_iter()
.map(|item| RolloutLine {
timestamp: String::new(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect(),
),
Some(_) | None => {
self.send_invalid_request_error(
@@ -3591,6 +3608,7 @@ mod tests {
let line = RolloutLine {
timestamp: timestamp.clone(),
agent_id: None,
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: session_meta.clone(),
git: None,

File diff suppressed because it is too large Load Diff

View File

@@ -148,6 +148,7 @@ impl ConversationManager {
Event {
id,
msg: EventMsg::SessionConfigured(session_configured),
..
} if id == INITIAL_SUBMIT_ID => session_configured,
_ => {
return Err(CodexErr::SessionConfiguredNotFirstEvent);
@@ -269,12 +270,16 @@ impl ConversationManager {
/// (0-based) and all items that follow it.
fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> InitialHistory {
// Work directly on rollout items, and cut the vector at the nth user message input.
let items: Vec<RolloutItem> = history.get_rollout_items();
let lines: Vec<RolloutLine> = history.get_rollout_lines();
let root_lines: Vec<RolloutLine> = lines
.into_iter()
.filter(|line| line.agent_id.as_deref().unwrap_or(&DEFAULT_AGENT_ID) == *DEFAULT_AGENT_ID)
.collect();
// Find indices of user message inputs in rollout order.
let mut user_positions: Vec<usize> = Vec::new();
for (idx, item) in items.iter().enumerate() {
if let RolloutItem::ResponseItem(item @ ResponseItem::Message { .. }) = item
for (idx, line) in root_lines.iter().enumerate() {
if let RolloutItem::ResponseItem(item @ ResponseItem::Message { .. }) = &line.item
&& matches!(
crate::event_mapping::parse_turn_item(item),
Some(TurnItem::UserMessage(_))
@@ -291,7 +296,7 @@ fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> Initia
// Cut strictly before the nth user message (do not keep the nth itself).
let cut_idx = user_positions[n];
let rolled: Vec<RolloutItem> = items.into_iter().take(cut_idx).collect();
let rolled: Vec<RolloutLine> = root_lines.into_iter().take(cut_idx).collect();
if rolled.is_empty() {
InitialHistory::New
@@ -355,13 +360,17 @@ mod tests {
];
// Wrap as InitialHistory::Forked with response items only.
let initial: Vec<RolloutItem> = items
let initial: Vec<RolloutLine> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.map(|item| RolloutLine {
timestamp: "t".to_string(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(initial), 1);
let got_items = truncated.get_rollout_items();
let got_items = truncated.get_rollout_items_for_agent(&DEFAULT_AGENT_ID);
let expected_items = vec![
RolloutItem::ResponseItem(items[0].clone()),
RolloutItem::ResponseItem(items[1].clone()),
@@ -372,10 +381,14 @@ mod tests {
serde_json::to_value(&expected_items).unwrap()
);
let initial2: Vec<RolloutItem> = items
let initial2: Vec<RolloutLine> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.map(|item| RolloutLine {
timestamp: "t".to_string(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect();
let truncated2 = truncate_before_nth_user_message(InitialHistory::Forked(initial2), 2);
assert_matches!(truncated2, InitialHistory::New);
@@ -390,14 +403,18 @@ mod tests {
items.push(user_msg("second question"));
items.push(assistant_msg("answer"));
let rollout_items: Vec<RolloutItem> = items
let rollout_items: Vec<RolloutLine> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.map(|item| RolloutLine {
timestamp: "t".to_string(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(rollout_items), 1);
let got_items = truncated.get_rollout_items();
let got_items = truncated.get_rollout_items_for_agent(&DEFAULT_AGENT_ID);
let expected: Vec<RolloutItem> = vec![
RolloutItem::ResponseItem(items[0].clone()),

View File

@@ -26,6 +26,7 @@ use super::policy::is_persisted_response_item;
use crate::config::Config;
use crate::default_client::originator;
use crate::git_info::collect_git_info;
use codex_protocol::protocol::AgentId;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::ResumedHistory;
use codex_protocol::protocol::RolloutItem;
@@ -62,7 +63,10 @@ pub enum RolloutRecorderParams {
}
enum RolloutCmd {
AddItems(Vec<RolloutItem>),
AddItems {
agent_id: String,
items: Vec<RolloutItem>,
},
/// Ensure all prior writes are processed; respond when flushed.
Flush {
ack: oneshot::Sender<()>,
@@ -177,7 +181,11 @@ impl RolloutRecorder {
Ok(Self { tx, rollout_path })
}
pub(crate) async fn record_items(&self, items: &[RolloutItem]) -> std::io::Result<()> {
pub(crate) async fn record_items(
&self,
agent_id: &AgentId,
items: &[RolloutItem],
) -> std::io::Result<()> {
let mut filtered = Vec::new();
for item in items {
// Note that function calls may look a bit strange if they are
@@ -191,7 +199,10 @@ impl RolloutRecorder {
return Ok(());
}
self.tx
.send(RolloutCmd::AddItems(filtered))
.send(RolloutCmd::AddItems {
agent_id: agent_id.to_string(),
items: filtered,
})
.await
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
@@ -214,7 +225,7 @@ impl RolloutRecorder {
return Err(IoError::other("empty session file"));
}
let mut items: Vec<RolloutItem> = Vec::new();
let mut lines: Vec<RolloutLine> = Vec::new();
let mut conversation_id: Option<ConversationId> = None;
for line in text.lines() {
if line.trim().is_empty() {
@@ -228,30 +239,17 @@ impl RolloutRecorder {
}
};
// Parse the rollout line structure
match serde_json::from_value::<RolloutLine>(v.clone()) {
Ok(rollout_line) => match rollout_line.item {
RolloutItem::SessionMeta(session_meta_line) => {
Ok(rollout_line) => {
if let RolloutItem::SessionMeta(session_meta_line) = &rollout_line.item
&& conversation_id.is_none()
{
// Use the FIRST SessionMeta encountered in the file as the canonical
// conversation id and main session information. Keep all items intact.
if conversation_id.is_none() {
conversation_id = Some(session_meta_line.meta.id);
}
items.push(RolloutItem::SessionMeta(session_meta_line));
conversation_id = Some(session_meta_line.meta.id);
}
RolloutItem::ResponseItem(item) => {
items.push(RolloutItem::ResponseItem(item));
}
RolloutItem::Compacted(item) => {
items.push(RolloutItem::Compacted(item));
}
RolloutItem::TurnContext(item) => {
items.push(RolloutItem::TurnContext(item));
}
RolloutItem::EventMsg(_ev) => {
items.push(RolloutItem::EventMsg(_ev));
}
},
lines.push(rollout_line);
}
Err(e) => {
warn!("failed to parse rollout line: {v:?}, error: {e}");
}
@@ -260,20 +258,20 @@ impl RolloutRecorder {
info!(
"Resumed rollout with {} items, conversation ID: {:?}",
items.len(),
lines.len(),
conversation_id
);
let conversation_id = conversation_id
.ok_or_else(|| IoError::other("failed to parse conversation ID from rollout file"))?;
if items.is_empty() {
if lines.is_empty() {
return Ok(InitialHistory::New);
}
info!("Resumed rollout successfully from {path:?}");
Ok(InitialHistory::Resumed(ResumedHistory {
conversation_id,
history: items,
history: lines,
rollout_path: path.to_path_buf(),
}))
}
@@ -364,17 +362,19 @@ async fn rollout_writer(
// Write the SessionMeta as the first item in the file, wrapped in a rollout line
writer
.write_rollout_item(RolloutItem::SessionMeta(session_meta_line))
.write_rollout_item(None, RolloutItem::SessionMeta(session_meta_line))
.await?;
}
// Process rollout commands
while let Some(cmd) = rx.recv().await {
match cmd {
RolloutCmd::AddItems(items) => {
RolloutCmd::AddItems { agent_id, items } => {
for item in items {
if is_persisted_response_item(&item) {
writer.write_rollout_item(item).await?;
writer
.write_rollout_item(Some(agent_id.as_str()), item)
.await?;
}
}
}
@@ -400,7 +400,11 @@ struct JsonlWriter {
}
impl JsonlWriter {
async fn write_rollout_item(&mut self, rollout_item: RolloutItem) -> std::io::Result<()> {
async fn write_rollout_item(
&mut self,
agent_id: Option<&str>,
rollout_item: RolloutItem,
) -> std::io::Result<()> {
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
@@ -410,6 +414,7 @@ impl JsonlWriter {
let line = RolloutLine {
timestamp,
agent_id: agent_id.map(str::to_string),
item: rollout_item,
};
self.write_line(&line).await

View File

@@ -109,7 +109,8 @@ impl Session {
input: Vec<UserInput>,
task: T,
) {
self.abort_all_tasks(TurnAbortReason::Replaced).await;
self.abort_agent_tasks(&turn_context.agent_id, TurnAbortReason::Replaced)
.await;
let task: Arc<dyn SessionTask> = Arc::new(task);
let task_kind = task.kind();
@@ -152,14 +153,25 @@ impl Session {
cancellation_token,
turn_context: Arc::clone(&turn_context),
};
self.register_new_active_task(running_task).await;
self.register_new_active_task(&turn_context.agent_id, running_task)
.await;
}
pub async fn abort_agent_tasks(self: &Arc<Self>, agent_id: &AgentId, reason: TurnAbortReason) {
for task in self.take_running_tasks_for_agent(agent_id).await {
self.handle_task_abort(task, reason.clone()).await;
}
self.maybe_close_unified_exec_sessions().await;
}
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
for task in self.take_all_running_tasks().await {
self.handle_task_abort(task, reason.clone()).await;
for agent in self.agent_states().await {
let agent_id = agent.agent_id.clone();
for task in self.take_running_tasks_for_agent(&agent_id).await {
self.handle_task_abort(task, reason.clone()).await;
}
}
self.close_unified_exec_sessions().await;
self.maybe_close_unified_exec_sessions().await;
}
pub async fn on_task_finished(
@@ -167,32 +179,31 @@ impl Session {
turn_context: Arc<TurnContext>,
last_agent_message: Option<String>,
) {
let mut active = self.active_turn.lock().await;
let should_close_sessions = if let Some(at) = active.as_mut()
&& at.remove_task(&turn_context.sub_id)
let agent = self.get_or_create_agent(&turn_context.agent_id).await;
{
*active = None;
true
} else {
false
};
drop(active);
if should_close_sessions {
self.close_unified_exec_sessions().await;
let mut active = agent.active_turn.lock().await;
if let Some(at) = active.as_mut()
&& at.remove_task(&turn_context.sub_id)
{
*active = None;
}
}
self.maybe_close_unified_exec_sessions().await;
let event = EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message });
self.send_event(turn_context.as_ref(), event).await;
}
async fn register_new_active_task(&self, task: RunningTask) {
let mut active = self.active_turn.lock().await;
async fn register_new_active_task(&self, agent_id: &AgentId, task: RunningTask) {
let agent = self.get_or_create_agent(agent_id).await;
let mut active = agent.active_turn.lock().await;
let mut turn = ActiveTurn::default();
turn.add_task(task);
*active = Some(turn);
}
async fn take_all_running_tasks(&self) -> Vec<RunningTask> {
let mut active = self.active_turn.lock().await;
async fn take_running_tasks_for_agent(&self, agent_id: &AgentId) -> Vec<RunningTask> {
let agent = self.get_or_create_agent(agent_id).await;
let mut active = agent.active_turn.lock().await;
match active.take() {
Some(mut at) => {
at.clear_pending().await;
@@ -203,7 +214,10 @@ impl Session {
}
}
async fn close_unified_exec_sessions(&self) {
async fn maybe_close_unified_exec_sessions(&self) {
if self.has_active_tasks().await {
return;
}
self.services
.unified_exec_manager
.terminate_all_sessions()

View File

@@ -20,9 +20,10 @@ These are entities exit on the codex backend. The intent of this section is to e
- `Codex` starts with no `Session`, and it is initialized by `Op::ConfigureSession`, which should be the first message sent by the UI.
- The current `Session` can be reconfigured with additional `Op::ConfigureSession` calls.
- Any running execution is aborted when the session is reconfigured.
- A session hosts one or more agents, each with its own conversation history and turn loop.
3. `Task`
- A `Task` is `Codex` executing work in response to user input.
- `Session` has at most one `Task` running at a time.
- Each agent has at most one `Task` running at a time (tasks can run concurrently across agents).
- Receiving `Op::UserInput` starts a `Task`
- Consists of a series of `Turn`s
- The `Task` executes to until:
@@ -44,7 +45,7 @@ The term "UI" is used to refer to the application driving `Codex`. This may be t
When a `Turn` completes, the `response_id` from the `Model`'s final `response.completed` message is stored in the `Session` state to resume the thread given the next `Op::UserInput`. The `response_id` is also returned in the `EventMsg::TurnComplete` to the UI, which can be used to fork the thread from an earlier point by providing it in the `Op::UserInput`.
Since only 1 `Task` can be run at a time, for parallel tasks it is recommended that a single `Codex` be run for each thread of work.
Since each agent can only run 1 `Task` at a time, parallel tasks should be routed to different agents (or separate Codex sessions).
## Interface
@@ -58,6 +59,7 @@ Since only 1 `Task` can be run at a time, for parallel tasks it is recommended t
- `Event`
- These are messages sent on the `EQ` (`Codex` -> UI)
- Each `Event` has a non-unique ID, matching the `sub_id` from the `Op::UserInput` that started the current task.
- `Event` includes an optional `agent_id`; when omitted, clients should treat it as the default agent (`root`).
- `EventMsg` refers to the enum of all possible `Event` payloads
- This enum is `non_exhaustive`; variants can be added at future dates
- It should be expected that new `EventMsg` variants will be added over time to expose more detailed information about the model's actions.
@@ -66,6 +68,7 @@ For complete documentation of the `Op` and `EventMsg` variants, refer to [protoc
- `Op`
- `Op::UserInput` Any input from the user to kick off a `Task`
- `Op::ForAgent` Route a nested `Op` to a specific agent (defaults to the `root` agent when omitted)
- `Op::Interrupt` Interrupts a running task
- `Op::ExecApproval` Approve or deny code execution
- `Op::ListSkills` Request skills for one or more cwd values (optionally `force_reload`)

View File

@@ -6,6 +6,8 @@ use ts_rs::TS;
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
pub struct HistoryEntry {
pub conversation_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub agent_id: Option<String>,
pub ts: u64,
pub text: String,
}

View File

@@ -8,6 +8,7 @@ use std::fmt;
use std::path::Path;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::LazyLock;
use std::time::Duration;
use crate::ConversationId;
@@ -50,6 +51,9 @@ pub const USER_INSTRUCTIONS_CLOSE_TAG: &str = "</user_instructions>";
pub const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = "<environment_context>";
pub const ENVIRONMENT_CONTEXT_CLOSE_TAG: &str = "</environment_context>";
pub const USER_MESSAGE_BEGIN: &str = "## My request for Codex:";
pub type AgentId = String;
pub static DEFAULT_AGENT_ID: LazyLock<AgentId> = LazyLock::new(|| "root".to_string());
/// Submission Queue Entry - requests from user
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
@@ -66,6 +70,13 @@ pub struct Submission {
#[allow(clippy::large_enum_variant)]
#[non_exhaustive]
pub enum Op {
/// Route an operation to a specific agent.
ForAgent {
/// Identifier of the target agent.
agent_id: AgentId,
/// Operation to route to the agent.
op: Box<Op>,
},
/// Abort current task.
/// This server sends [`EventMsg::TurnAborted`] in response.
Interrupt,
@@ -487,6 +498,9 @@ impl SandboxPolicy {
pub struct Event {
/// Submission `id` that this event is correlated with.
pub id: String,
/// Optional agent identifier associated with this event.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub agent_id: Option<AgentId>,
/// Payload
pub msg: EventMsg,
}
@@ -1128,7 +1142,7 @@ pub struct ConversationPathResponseEvent {
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct ResumedHistory {
pub conversation_id: ConversationId,
pub history: Vec<RolloutItem>,
pub history: Vec<RolloutLine>,
pub rollout_path: PathBuf,
}
@@ -1136,37 +1150,63 @@ pub struct ResumedHistory {
pub enum InitialHistory {
New,
Resumed(ResumedHistory),
Forked(Vec<RolloutItem>),
Forked(Vec<RolloutLine>),
}
impl InitialHistory {
pub fn get_rollout_items(&self) -> Vec<RolloutItem> {
pub fn get_rollout_lines(&self) -> Vec<RolloutLine> {
match self {
InitialHistory::New => Vec::new(),
InitialHistory::Resumed(resumed) => resumed.history.clone(),
InitialHistory::Forked(items) => items.clone(),
InitialHistory::Forked(lines) => lines.clone(),
}
}
pub fn get_rollout_items_for_agent(&self, agent_id: &AgentId) -> Vec<RolloutItem> {
self.get_rollout_lines()
.into_iter()
.filter_map(|line| {
let line_agent = line.agent_id.as_deref().unwrap_or(&DEFAULT_AGENT_ID);
(line_agent == agent_id).then_some(line.item)
})
.collect()
}
pub fn get_event_msgs(&self) -> Option<Vec<EventMsg>> {
self.get_event_msgs_for_agent(&DEFAULT_AGENT_ID)
}
pub fn get_event_msgs_for_agent(&self, agent_id: &AgentId) -> Option<Vec<EventMsg>> {
match self {
InitialHistory::New => None,
InitialHistory::Resumed(resumed) => Some(
resumed
.history
.iter()
.filter_map(|ri| match ri {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
.filter_map(|line| {
let line_agent = line.agent_id.as_deref().unwrap_or(&DEFAULT_AGENT_ID);
if line_agent != agent_id {
return None;
}
match &line.item {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
}
})
.collect(),
),
InitialHistory::Forked(items) => Some(
items
InitialHistory::Forked(lines) => Some(
lines
.iter()
.filter_map(|ri| match ri {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
.filter_map(|line| {
let line_agent = line.agent_id.as_deref().unwrap_or(&DEFAULT_AGENT_ID);
if line_agent != agent_id {
return None;
}
match &line.item {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
}
})
.collect(),
),
@@ -1296,9 +1336,11 @@ pub struct TurnContextItem {
pub summary: ReasoningSummaryConfig,
}
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema, TS)]
pub struct RolloutLine {
pub timestamp: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub agent_id: Option<AgentId>,
#[serde(flatten)]
pub item: RolloutItem,
}
@@ -1865,6 +1907,7 @@ mod tests {
let rollout_file = NamedTempFile::new()?;
let event = Event {
id: "1234".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: conversation_id,
model: "codex-mini-latest".to_string(),
@@ -1924,6 +1967,7 @@ mod tests {
fn serialize_mcp_startup_update_event() -> Result<()> {
let event = Event {
id: "init".to_string(),
agent_id: None,
msg: EventMsg::McpStartupUpdate(McpStartupUpdateEvent {
server: "srv".to_string(),
status: McpStartupStatus::Failed {
@@ -1944,6 +1988,7 @@ mod tests {
fn serialize_mcp_startup_complete_event() -> Result<()> {
let event = Event {
id: "init".to_string(),
agent_id: None,
msg: EventMsg::McpStartupComplete(McpStartupCompleteEvent {
ready: vec!["a".to_string()],
failed: vec![McpStartupFailure {

View File

@@ -25,6 +25,7 @@ use codex_core::protocol::AgentReasoningRawContentEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::CreditsSnapshot;
use codex_core::protocol::DEFAULT_AGENT_ID;
use codex_core::protocol::DeprecationNoticeEvent;
use codex_core::protocol::ErrorEvent;
use codex_core::protocol::Event;
@@ -1725,6 +1726,7 @@ impl ChatWidget {
self.app_event_tx.send(AppEvent::CodexEvent(Event {
id: "1".to_string(),
agent_id: None,
// msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
// call_id: "1".to_string(),
// command: vec!["git".into(), "apply".into()],
@@ -1890,10 +1892,25 @@ impl ChatWidget {
}
pub(crate) fn handle_codex_event(&mut self, event: Event) {
let Event { id, msg } = event;
let Event { id, msg, agent_id } = event;
if !self.should_handle_agent_event(agent_id.as_deref(), &msg) {
return;
}
self.dispatch_event_msg(Some(id), msg, false);
}
fn should_handle_agent_event(&self, agent_id: Option<&str>, msg: &EventMsg) -> bool {
let agent_id = agent_id.unwrap_or(DEFAULT_AGENT_ID);
if agent_id == DEFAULT_AGENT_ID {
return true;
}
matches!(
msg,
EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_)
)
}
/// Dispatch a protocol `EventMsg` to the appropriate handler.
///
/// `id` is `Some` for live events and `None` for replayed events from