mirror of
https://github.com/openai/codex.git
synced 2026-05-01 18:06:47 +00:00
Compare commits
5 Commits
dev/abhina
...
dev/rasmus
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4980d00b95 | ||
|
|
48b0b1ac97 | ||
|
|
ce284b587a | ||
|
|
0bfe4b66d7 | ||
|
|
3253d2d972 |
20
codex-rs/Cargo.lock
generated
20
codex-rs/Cargo.lock
generated
@@ -1748,6 +1748,26 @@ dependencies = [
|
||||
"unicode-width 0.2.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-agent-graph-store"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"codex-protocol",
|
||||
"codex-state",
|
||||
"pretty_assertions",
|
||||
"prost 0.14.3",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
"tonic-prost",
|
||||
"tonic-prost-build",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-agent-identity"
|
||||
version = "0.0.0"
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
members = [
|
||||
"aws-auth",
|
||||
"analytics",
|
||||
"agent-graph-store",
|
||||
"agent-identity",
|
||||
"backend-client",
|
||||
"ansi-escape",
|
||||
@@ -113,6 +114,7 @@ license = "Apache-2.0"
|
||||
# Internal
|
||||
app_test_support = { path = "app-server/tests/common" }
|
||||
codex-analytics = { path = "analytics" }
|
||||
codex-agent-graph-store = { path = "agent-graph-store" }
|
||||
codex-agent-identity = { path = "agent-identity" }
|
||||
codex-ansi-escape = { path = "ansi-escape" }
|
||||
codex-api = { path = "codex-api" }
|
||||
@@ -437,6 +439,7 @@ unwrap_used = "deny"
|
||||
# silence the false positive here instead of deleting a real dependency.
|
||||
[workspace.metadata.cargo-shear]
|
||||
ignored = [
|
||||
"codex-agent-graph-store",
|
||||
"icu_provider",
|
||||
"openssl-sys",
|
||||
"codex-utils-readiness",
|
||||
|
||||
6
codex-rs/agent-graph-store/BUILD.bazel
Normal file
6
codex-rs/agent-graph-store/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "agent-graph-store",
|
||||
crate_name = "codex_agent_graph_store",
|
||||
)
|
||||
35
codex-rs/agent-graph-store/Cargo.toml
Normal file
35
codex-rs/agent-graph-store/Cargo.toml
Normal file
@@ -0,0 +1,35 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "codex-agent-graph-store"
|
||||
version.workspace = true
|
||||
|
||||
[lib]
|
||||
name = "codex_agent_graph_store"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[[example]]
|
||||
name = "generate-proto"
|
||||
path = "examples/generate-proto.rs"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-state = { workspace = true }
|
||||
prost = "0.14.3"
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
thiserror = { workspace = true }
|
||||
tonic = { workspace = true }
|
||||
tonic-prost = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
tokio-stream = { workspace = true, features = ["net"] }
|
||||
tonic = { workspace = true, features = ["router", "transport"] }
|
||||
tonic-prost-build = { version = "=0.14.3", default-features = false, features = ["transport"] }
|
||||
19
codex-rs/agent-graph-store/examples/generate-proto.rs
Normal file
19
codex-rs/agent-graph-store/examples/generate-proto.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let Some(proto_dir_arg) = std::env::args().nth(1) else {
|
||||
eprintln!("Usage: generate-proto <proto-dir>");
|
||||
std::process::exit(1);
|
||||
};
|
||||
|
||||
let proto_dir = PathBuf::from(proto_dir_arg);
|
||||
let proto_file = proto_dir.join("codex.agent_graph_store.v1.proto");
|
||||
|
||||
tonic_prost_build::configure()
|
||||
.build_client(true)
|
||||
.build_server(true)
|
||||
.out_dir(&proto_dir)
|
||||
.compile_protos(&[proto_file], &[proto_dir])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
38
codex-rs/agent-graph-store/scripts/generate-proto.sh
Executable file
38
codex-rs/agent-graph-store/scripts/generate-proto.sh
Executable file
@@ -0,0 +1,38 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
repo_root="$(cd "$script_dir/../../.." && pwd)"
|
||||
proto_dir="$repo_root/codex-rs/agent-graph-store/src/remote/proto"
|
||||
generated="$proto_dir/codex.agent_graph_store.v1.rs"
|
||||
tmpdir="$(mktemp -d)"
|
||||
|
||||
cleanup() {
|
||||
rm -rf "$tmpdir"
|
||||
}
|
||||
trap cleanup EXIT
|
||||
|
||||
(
|
||||
cd "$repo_root/codex-rs"
|
||||
CARGO_TARGET_DIR="$tmpdir/target" cargo run \
|
||||
-p codex-agent-graph-store \
|
||||
--example generate-proto \
|
||||
-- "$proto_dir"
|
||||
)
|
||||
|
||||
if ! sed -n '2p' "$generated" | grep -q 'clippy::trivially_copy_pass_by_ref'; then
|
||||
{
|
||||
sed -n '1p' "$generated"
|
||||
printf '#![allow(clippy::trivially_copy_pass_by_ref)]\n'
|
||||
sed '1d' "$generated"
|
||||
} > "$tmpdir/generated.rs"
|
||||
mv "$tmpdir/generated.rs" "$generated"
|
||||
fi
|
||||
|
||||
rustfmt --edition 2024 "$generated"
|
||||
|
||||
awk '
|
||||
NR == 3 && previous ~ /clippy::trivially_copy_pass_by_ref/ && $0 != "" { print "" }
|
||||
{ print; previous = $0 }
|
||||
' "$generated" > "$tmpdir/formatted.rs"
|
||||
mv "$tmpdir/formatted.rs" "$generated"
|
||||
20
codex-rs/agent-graph-store/src/error.rs
Normal file
20
codex-rs/agent-graph-store/src/error.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
/// Result type returned by agent graph store operations.
|
||||
pub type AgentGraphStoreResult<T> = Result<T, AgentGraphStoreError>;
|
||||
|
||||
/// Error type shared by agent graph store implementations.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum AgentGraphStoreError {
|
||||
/// The caller supplied invalid request data.
|
||||
#[error("invalid agent graph store request: {message}")]
|
||||
InvalidRequest {
|
||||
/// User-facing explanation of the invalid request.
|
||||
message: String,
|
||||
},
|
||||
|
||||
/// Catch-all for implementation failures that do not fit a more specific category.
|
||||
#[error("agent graph store internal error: {message}")]
|
||||
Internal {
|
||||
/// User-facing explanation of the implementation failure.
|
||||
message: String,
|
||||
},
|
||||
}
|
||||
14
codex-rs/agent-graph-store/src/lib.rs
Normal file
14
codex-rs/agent-graph-store/src/lib.rs
Normal file
@@ -0,0 +1,14 @@
|
||||
//! Storage-neutral parent/child topology for thread-spawned agents.
|
||||
|
||||
mod error;
|
||||
mod local;
|
||||
mod remote;
|
||||
mod store;
|
||||
mod types;
|
||||
|
||||
pub use error::AgentGraphStoreError;
|
||||
pub use error::AgentGraphStoreResult;
|
||||
pub use local::LocalAgentGraphStore;
|
||||
pub use remote::RemoteAgentGraphStore;
|
||||
pub use store::AgentGraphStore;
|
||||
pub use types::ThreadSpawnEdgeStatus;
|
||||
325
codex-rs/agent-graph-store/src/local.rs
Normal file
325
codex-rs/agent-graph-store/src/local.rs
Normal file
@@ -0,0 +1,325 @@
|
||||
use async_trait::async_trait;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_state::StateRuntime;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::AgentGraphStore;
|
||||
use crate::AgentGraphStoreError;
|
||||
use crate::AgentGraphStoreResult;
|
||||
use crate::ThreadSpawnEdgeStatus;
|
||||
|
||||
/// SQLite-backed implementation of [`AgentGraphStore`] using an existing state runtime.
|
||||
#[derive(Clone)]
|
||||
pub struct LocalAgentGraphStore {
|
||||
state_db: Arc<StateRuntime>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for LocalAgentGraphStore {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("LocalAgentGraphStore")
|
||||
.field("codex_home", &self.state_db.codex_home())
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl LocalAgentGraphStore {
|
||||
/// Create a local graph store from an already-initialized state runtime.
|
||||
pub fn new(state_db: Arc<StateRuntime>) -> Self {
|
||||
Self { state_db }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AgentGraphStore for LocalAgentGraphStore {
|
||||
async fn upsert_thread_spawn_edge(
|
||||
&self,
|
||||
parent_thread_id: ThreadId,
|
||||
child_thread_id: ThreadId,
|
||||
status: ThreadSpawnEdgeStatus,
|
||||
) -> AgentGraphStoreResult<()> {
|
||||
self.state_db
|
||||
.upsert_thread_spawn_edge(parent_thread_id, child_thread_id, to_state_status(status))
|
||||
.await
|
||||
.map_err(internal_error)
|
||||
}
|
||||
|
||||
async fn set_thread_spawn_edge_status(
|
||||
&self,
|
||||
child_thread_id: ThreadId,
|
||||
status: ThreadSpawnEdgeStatus,
|
||||
) -> AgentGraphStoreResult<()> {
|
||||
self.state_db
|
||||
.set_thread_spawn_edge_status(child_thread_id, to_state_status(status))
|
||||
.await
|
||||
.map_err(internal_error)
|
||||
}
|
||||
|
||||
async fn list_thread_spawn_children(
|
||||
&self,
|
||||
parent_thread_id: ThreadId,
|
||||
status_filter: Option<ThreadSpawnEdgeStatus>,
|
||||
) -> AgentGraphStoreResult<Vec<ThreadId>> {
|
||||
if let Some(status) = status_filter {
|
||||
return self
|
||||
.state_db
|
||||
.list_thread_spawn_children_with_status(parent_thread_id, to_state_status(status))
|
||||
.await
|
||||
.map_err(internal_error);
|
||||
}
|
||||
|
||||
self.state_db
|
||||
.list_thread_spawn_children(parent_thread_id)
|
||||
.await
|
||||
.map_err(internal_error)
|
||||
}
|
||||
|
||||
async fn list_thread_spawn_descendants(
|
||||
&self,
|
||||
root_thread_id: ThreadId,
|
||||
status_filter: Option<ThreadSpawnEdgeStatus>,
|
||||
) -> AgentGraphStoreResult<Vec<ThreadId>> {
|
||||
match status_filter {
|
||||
Some(status) => self
|
||||
.state_db
|
||||
.list_thread_spawn_descendants_with_status(root_thread_id, to_state_status(status))
|
||||
.await
|
||||
.map_err(internal_error),
|
||||
None => self
|
||||
.state_db
|
||||
.list_thread_spawn_descendants(root_thread_id)
|
||||
.await
|
||||
.map_err(internal_error),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn to_state_status(status: ThreadSpawnEdgeStatus) -> codex_state::DirectionalThreadSpawnEdgeStatus {
|
||||
match status {
|
||||
ThreadSpawnEdgeStatus::Open => codex_state::DirectionalThreadSpawnEdgeStatus::Open,
|
||||
ThreadSpawnEdgeStatus::Closed => codex_state::DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
}
|
||||
}
|
||||
|
||||
fn internal_error(err: impl std::fmt::Display) -> AgentGraphStoreError {
|
||||
AgentGraphStoreError::Internal {
|
||||
message: err.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use codex_state::DirectionalThreadSpawnEdgeStatus;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
|
||||
struct TestRuntime {
|
||||
state_db: Arc<StateRuntime>,
|
||||
_codex_home: TempDir,
|
||||
}
|
||||
|
||||
fn thread_id(suffix: u128) -> ThreadId {
|
||||
ThreadId::from_string(&format!("00000000-0000-0000-0000-{suffix:012}"))
|
||||
.expect("valid thread id")
|
||||
}
|
||||
|
||||
async fn state_runtime() -> TestRuntime {
|
||||
let codex_home = TempDir::new().expect("tempdir should be created");
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "test-provider".to_string())
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
TestRuntime {
|
||||
state_db,
|
||||
_codex_home: codex_home,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn local_store_upserts_and_lists_direct_children_with_status_filters() {
|
||||
let fixture = state_runtime().await;
|
||||
let state_db = fixture.state_db;
|
||||
let store = LocalAgentGraphStore::new(state_db.clone());
|
||||
let parent_thread_id = thread_id(/*suffix*/ 1);
|
||||
let first_child_thread_id = thread_id(/*suffix*/ 2);
|
||||
let second_child_thread_id = thread_id(/*suffix*/ 3);
|
||||
|
||||
store
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
second_child_thread_id,
|
||||
ThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await
|
||||
.expect("closed child edge should insert");
|
||||
store
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
first_child_thread_id,
|
||||
ThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await
|
||||
.expect("open child edge should insert");
|
||||
|
||||
let all_children = store
|
||||
.list_thread_spawn_children(parent_thread_id, /*status_filter*/ None)
|
||||
.await
|
||||
.expect("all children should load");
|
||||
assert_eq!(
|
||||
all_children,
|
||||
vec![first_child_thread_id, second_child_thread_id]
|
||||
);
|
||||
|
||||
let open_children = store
|
||||
.list_thread_spawn_children(parent_thread_id, Some(ThreadSpawnEdgeStatus::Open))
|
||||
.await
|
||||
.expect("open children should load");
|
||||
let state_open_children = state_db
|
||||
.list_thread_spawn_children_with_status(
|
||||
parent_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await
|
||||
.expect("state open children should load");
|
||||
assert_eq!(open_children, state_open_children);
|
||||
assert_eq!(open_children, vec![first_child_thread_id]);
|
||||
|
||||
let closed_children = store
|
||||
.list_thread_spawn_children(parent_thread_id, Some(ThreadSpawnEdgeStatus::Closed))
|
||||
.await
|
||||
.expect("closed children should load");
|
||||
assert_eq!(closed_children, vec![second_child_thread_id]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn local_store_updates_edge_status() {
|
||||
let fixture = state_runtime().await;
|
||||
let state_db = fixture.state_db;
|
||||
let store = LocalAgentGraphStore::new(state_db);
|
||||
let parent_thread_id = thread_id(/*suffix*/ 10);
|
||||
let child_thread_id = thread_id(/*suffix*/ 11);
|
||||
|
||||
store
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
child_thread_id,
|
||||
ThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await
|
||||
.expect("child edge should insert");
|
||||
store
|
||||
.set_thread_spawn_edge_status(child_thread_id, ThreadSpawnEdgeStatus::Closed)
|
||||
.await
|
||||
.expect("child edge should close");
|
||||
|
||||
let open_children = store
|
||||
.list_thread_spawn_children(parent_thread_id, Some(ThreadSpawnEdgeStatus::Open))
|
||||
.await
|
||||
.expect("open children should load");
|
||||
assert_eq!(open_children, Vec::<ThreadId>::new());
|
||||
|
||||
let closed_children = store
|
||||
.list_thread_spawn_children(parent_thread_id, Some(ThreadSpawnEdgeStatus::Closed))
|
||||
.await
|
||||
.expect("closed children should load");
|
||||
assert_eq!(closed_children, vec![child_thread_id]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn local_store_lists_descendants_breadth_first_with_status_filters() {
|
||||
let fixture = state_runtime().await;
|
||||
let state_db = fixture.state_db;
|
||||
let store = LocalAgentGraphStore::new(state_db.clone());
|
||||
let root_thread_id = thread_id(/*suffix*/ 20);
|
||||
let later_child_thread_id = thread_id(/*suffix*/ 22);
|
||||
let earlier_child_thread_id = thread_id(/*suffix*/ 21);
|
||||
let closed_grandchild_thread_id = thread_id(/*suffix*/ 23);
|
||||
let open_grandchild_thread_id = thread_id(/*suffix*/ 24);
|
||||
let closed_child_thread_id = thread_id(/*suffix*/ 25);
|
||||
let closed_great_grandchild_thread_id = thread_id(/*suffix*/ 26);
|
||||
|
||||
for (parent_thread_id, child_thread_id, status) in [
|
||||
(
|
||||
root_thread_id,
|
||||
later_child_thread_id,
|
||||
ThreadSpawnEdgeStatus::Open,
|
||||
),
|
||||
(
|
||||
root_thread_id,
|
||||
earlier_child_thread_id,
|
||||
ThreadSpawnEdgeStatus::Open,
|
||||
),
|
||||
(
|
||||
earlier_child_thread_id,
|
||||
open_grandchild_thread_id,
|
||||
ThreadSpawnEdgeStatus::Open,
|
||||
),
|
||||
(
|
||||
later_child_thread_id,
|
||||
closed_grandchild_thread_id,
|
||||
ThreadSpawnEdgeStatus::Closed,
|
||||
),
|
||||
(
|
||||
root_thread_id,
|
||||
closed_child_thread_id,
|
||||
ThreadSpawnEdgeStatus::Closed,
|
||||
),
|
||||
(
|
||||
closed_child_thread_id,
|
||||
closed_great_grandchild_thread_id,
|
||||
ThreadSpawnEdgeStatus::Closed,
|
||||
),
|
||||
] {
|
||||
store
|
||||
.upsert_thread_spawn_edge(parent_thread_id, child_thread_id, status)
|
||||
.await
|
||||
.expect("edge should insert");
|
||||
}
|
||||
|
||||
let all_descendants = store
|
||||
.list_thread_spawn_descendants(root_thread_id, /*status_filter*/ None)
|
||||
.await
|
||||
.expect("all descendants should load");
|
||||
assert_eq!(
|
||||
all_descendants,
|
||||
vec![
|
||||
earlier_child_thread_id,
|
||||
later_child_thread_id,
|
||||
closed_child_thread_id,
|
||||
closed_grandchild_thread_id,
|
||||
open_grandchild_thread_id,
|
||||
closed_great_grandchild_thread_id,
|
||||
]
|
||||
);
|
||||
|
||||
let open_descendants = store
|
||||
.list_thread_spawn_descendants(root_thread_id, Some(ThreadSpawnEdgeStatus::Open))
|
||||
.await
|
||||
.expect("open descendants should load");
|
||||
let state_open_descendants = state_db
|
||||
.list_thread_spawn_descendants_with_status(
|
||||
root_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await
|
||||
.expect("state open descendants should load");
|
||||
assert_eq!(open_descendants, state_open_descendants);
|
||||
assert_eq!(
|
||||
open_descendants,
|
||||
vec![
|
||||
earlier_child_thread_id,
|
||||
later_child_thread_id,
|
||||
open_grandchild_thread_id,
|
||||
]
|
||||
);
|
||||
|
||||
let closed_descendants = store
|
||||
.list_thread_spawn_descendants(root_thread_id, Some(ThreadSpawnEdgeStatus::Closed))
|
||||
.await
|
||||
.expect("closed descendants should load");
|
||||
assert_eq!(
|
||||
closed_descendants,
|
||||
vec![closed_child_thread_id, closed_great_grandchild_thread_id]
|
||||
);
|
||||
}
|
||||
}
|
||||
13
codex-rs/agent-graph-store/src/remote/AGENTS.md
Normal file
13
codex-rs/agent-graph-store/src/remote/AGENTS.md
Normal file
@@ -0,0 +1,13 @@
|
||||
# Remote Agent Graph Store
|
||||
|
||||
- The Rust protobuf output in `proto/codex.agent_graph_store.v1.rs` is checked in.
|
||||
- Do not add build-time protobuf generation to `codex-agent-graph-store` unless the Bazel/Cargo story is intentionally changed.
|
||||
- When `proto/codex.agent_graph_store.v1.proto` changes, regenerate the Rust file manually and include both files in the same commit.
|
||||
|
||||
Run this from the repository root:
|
||||
|
||||
```sh
|
||||
./codex-rs/agent-graph-store/scripts/generate-proto.sh
|
||||
```
|
||||
|
||||
The command requires `protoc` to be available on `PATH`.
|
||||
42
codex-rs/agent-graph-store/src/remote/helpers.rs
Normal file
42
codex-rs/agent-graph-store/src/remote/helpers.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
use codex_protocol::ThreadId;
|
||||
|
||||
use super::proto;
|
||||
use crate::AgentGraphStoreError;
|
||||
use crate::AgentGraphStoreResult;
|
||||
use crate::ThreadSpawnEdgeStatus;
|
||||
|
||||
pub(super) fn proto_status(status: ThreadSpawnEdgeStatus) -> proto::ThreadSpawnEdgeStatus {
|
||||
match status {
|
||||
ThreadSpawnEdgeStatus::Open => proto::ThreadSpawnEdgeStatus::Open,
|
||||
ThreadSpawnEdgeStatus::Closed => proto::ThreadSpawnEdgeStatus::Closed,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn proto_status_filter(status: Option<ThreadSpawnEdgeStatus>) -> Option<i32> {
|
||||
status.map(proto_status).map(Into::into)
|
||||
}
|
||||
|
||||
pub(super) fn thread_ids_from_proto(
|
||||
thread_ids: Vec<String>,
|
||||
field_name: &str,
|
||||
) -> AgentGraphStoreResult<Vec<ThreadId>> {
|
||||
thread_ids
|
||||
.into_iter()
|
||||
.map(|thread_id| {
|
||||
ThreadId::from_string(&thread_id).map_err(|err| AgentGraphStoreError::InvalidRequest {
|
||||
message: format!("remote agent graph store returned invalid {field_name}: {err}"),
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(super) fn remote_status_to_error(status: tonic::Status) -> AgentGraphStoreError {
|
||||
match status.code() {
|
||||
tonic::Code::InvalidArgument => AgentGraphStoreError::InvalidRequest {
|
||||
message: status.message().to_string(),
|
||||
},
|
||||
_ => AgentGraphStoreError::Internal {
|
||||
message: format!("remote agent graph store request failed: {status}"),
|
||||
},
|
||||
}
|
||||
}
|
||||
339
codex-rs/agent-graph-store/src/remote/mod.rs
Normal file
339
codex-rs/agent-graph-store/src/remote/mod.rs
Normal file
@@ -0,0 +1,339 @@
|
||||
mod helpers;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use codex_protocol::ThreadId;
|
||||
use proto::agent_graph_store_client::AgentGraphStoreClient;
|
||||
|
||||
use crate::AgentGraphStore;
|
||||
use crate::AgentGraphStoreError;
|
||||
use crate::AgentGraphStoreResult;
|
||||
use crate::ThreadSpawnEdgeStatus;
|
||||
|
||||
#[path = "proto/codex.agent_graph_store.v1.rs"]
|
||||
mod proto;
|
||||
|
||||
/// gRPC-backed [`AgentGraphStore`] implementation for deployments whose durable
|
||||
/// subagent graph lives outside the app-server process.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RemoteAgentGraphStore {
|
||||
endpoint: String,
|
||||
}
|
||||
|
||||
impl RemoteAgentGraphStore {
|
||||
pub fn new(endpoint: impl Into<String>) -> Self {
|
||||
Self {
|
||||
endpoint: endpoint.into(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn client(
|
||||
&self,
|
||||
) -> AgentGraphStoreResult<AgentGraphStoreClient<tonic::transport::Channel>> {
|
||||
AgentGraphStoreClient::connect(self.endpoint.clone())
|
||||
.await
|
||||
.map_err(|err| AgentGraphStoreError::Internal {
|
||||
message: format!("failed to connect to remote agent graph store: {err}"),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AgentGraphStore for RemoteAgentGraphStore {
|
||||
async fn upsert_thread_spawn_edge(
|
||||
&self,
|
||||
parent_thread_id: ThreadId,
|
||||
child_thread_id: ThreadId,
|
||||
status: ThreadSpawnEdgeStatus,
|
||||
) -> AgentGraphStoreResult<()> {
|
||||
let request = proto::UpsertThreadSpawnEdgeRequest {
|
||||
parent_thread_id: parent_thread_id.to_string(),
|
||||
child_thread_id: child_thread_id.to_string(),
|
||||
status: helpers::proto_status(status).into(),
|
||||
};
|
||||
self.client()
|
||||
.await?
|
||||
.upsert_thread_spawn_edge(request)
|
||||
.await
|
||||
.map_err(helpers::remote_status_to_error)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn set_thread_spawn_edge_status(
|
||||
&self,
|
||||
child_thread_id: ThreadId,
|
||||
status: ThreadSpawnEdgeStatus,
|
||||
) -> AgentGraphStoreResult<()> {
|
||||
let request = proto::SetThreadSpawnEdgeStatusRequest {
|
||||
child_thread_id: child_thread_id.to_string(),
|
||||
status: helpers::proto_status(status).into(),
|
||||
};
|
||||
self.client()
|
||||
.await?
|
||||
.set_thread_spawn_edge_status(request)
|
||||
.await
|
||||
.map_err(helpers::remote_status_to_error)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn list_thread_spawn_children(
|
||||
&self,
|
||||
parent_thread_id: ThreadId,
|
||||
status_filter: Option<ThreadSpawnEdgeStatus>,
|
||||
) -> AgentGraphStoreResult<Vec<ThreadId>> {
|
||||
let response = self
|
||||
.client()
|
||||
.await?
|
||||
.list_thread_spawn_children(proto::ListThreadSpawnChildrenRequest {
|
||||
parent_thread_id: parent_thread_id.to_string(),
|
||||
status_filter: helpers::proto_status_filter(status_filter),
|
||||
})
|
||||
.await
|
||||
.map_err(helpers::remote_status_to_error)?
|
||||
.into_inner();
|
||||
helpers::thread_ids_from_proto(response.thread_ids, "child thread_id")
|
||||
}
|
||||
|
||||
async fn list_thread_spawn_descendants(
|
||||
&self,
|
||||
root_thread_id: ThreadId,
|
||||
status_filter: Option<ThreadSpawnEdgeStatus>,
|
||||
) -> AgentGraphStoreResult<Vec<ThreadId>> {
|
||||
let response = self
|
||||
.client()
|
||||
.await?
|
||||
.list_thread_spawn_descendants(proto::ListThreadSpawnDescendantsRequest {
|
||||
root_thread_id: root_thread_id.to_string(),
|
||||
status_filter: helpers::proto_status_filter(status_filter),
|
||||
})
|
||||
.await
|
||||
.map_err(helpers::remote_status_to_error)?
|
||||
.into_inner();
|
||||
helpers::thread_ids_from_proto(response.thread_ids, "descendant thread_id")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::proto;
|
||||
use super::proto::agent_graph_store_server;
|
||||
use super::proto::agent_graph_store_server::AgentGraphStoreServer;
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tonic::Request;
|
||||
use tonic::Response;
|
||||
use tonic::Status;
|
||||
use tonic::transport::Server;
|
||||
|
||||
fn thread_id(suffix: u128) -> ThreadId {
|
||||
ThreadId::from_string(&format!("00000000-0000-0000-0000-{suffix:012}"))
|
||||
.expect("valid thread id")
|
||||
}
|
||||
|
||||
async fn serve_test_server(server: TestServer) -> (RemoteAgentGraphStore, ServerShutdown) {
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("bind test server");
|
||||
let addr = listener.local_addr().expect("test server addr");
|
||||
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
|
||||
let handle = tokio::spawn(async move {
|
||||
Server::builder()
|
||||
.add_service(AgentGraphStoreServer::new(server))
|
||||
.serve_with_incoming_shutdown(
|
||||
tokio_stream::wrappers::TcpListenerStream::new(listener),
|
||||
async {
|
||||
let _ = shutdown_rx.await;
|
||||
},
|
||||
)
|
||||
.await
|
||||
});
|
||||
|
||||
(
|
||||
RemoteAgentGraphStore::new(format!("http://{addr}")),
|
||||
ServerShutdown {
|
||||
shutdown_tx,
|
||||
handle,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
struct ServerShutdown {
|
||||
shutdown_tx: tokio::sync::oneshot::Sender<()>,
|
||||
handle: tokio::task::JoinHandle<Result<(), tonic::transport::Error>>,
|
||||
}
|
||||
|
||||
impl ServerShutdown {
|
||||
async fn shutdown(self) {
|
||||
let _ = self.shutdown_tx.send(());
|
||||
self.handle.await.expect("join server").expect("server");
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
enum TestServer {
|
||||
HappyPath,
|
||||
InvalidThreadIdResponse,
|
||||
InvalidArgumentStatus,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl agent_graph_store_server::AgentGraphStore for TestServer {
|
||||
async fn upsert_thread_spawn_edge(
|
||||
&self,
|
||||
request: Request<proto::UpsertThreadSpawnEdgeRequest>,
|
||||
) -> Result<Response<proto::Empty>, Status> {
|
||||
match self {
|
||||
TestServer::InvalidArgumentStatus => {
|
||||
Err(Status::invalid_argument("status must be specified"))
|
||||
}
|
||||
TestServer::HappyPath | TestServer::InvalidThreadIdResponse => {
|
||||
let request = request.into_inner();
|
||||
assert_eq!(
|
||||
request.parent_thread_id,
|
||||
"00000000-0000-0000-0000-000000000001"
|
||||
);
|
||||
assert_eq!(
|
||||
request.child_thread_id,
|
||||
"00000000-0000-0000-0000-000000000002"
|
||||
);
|
||||
assert_eq!(
|
||||
proto::ThreadSpawnEdgeStatus::try_from(request.status),
|
||||
Ok(proto::ThreadSpawnEdgeStatus::Open)
|
||||
);
|
||||
Ok(Response::new(proto::Empty {}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn set_thread_spawn_edge_status(
|
||||
&self,
|
||||
request: Request<proto::SetThreadSpawnEdgeStatusRequest>,
|
||||
) -> Result<Response<proto::Empty>, Status> {
|
||||
let request = request.into_inner();
|
||||
assert_eq!(
|
||||
request.child_thread_id,
|
||||
"00000000-0000-0000-0000-000000000002"
|
||||
);
|
||||
assert_eq!(
|
||||
proto::ThreadSpawnEdgeStatus::try_from(request.status),
|
||||
Ok(proto::ThreadSpawnEdgeStatus::Closed)
|
||||
);
|
||||
Ok(Response::new(proto::Empty {}))
|
||||
}
|
||||
|
||||
async fn list_thread_spawn_children(
|
||||
&self,
|
||||
request: Request<proto::ListThreadSpawnChildrenRequest>,
|
||||
) -> Result<Response<proto::ListThreadSpawnChildrenResponse>, Status> {
|
||||
let request = request.into_inner();
|
||||
assert_eq!(
|
||||
request.parent_thread_id,
|
||||
"00000000-0000-0000-0000-000000000001"
|
||||
);
|
||||
assert_eq!(
|
||||
request
|
||||
.status_filter
|
||||
.map(proto::ThreadSpawnEdgeStatus::try_from),
|
||||
Some(Ok(proto::ThreadSpawnEdgeStatus::Open))
|
||||
);
|
||||
let thread_ids = match self {
|
||||
TestServer::InvalidThreadIdResponse => vec!["not-a-thread-id".to_string()],
|
||||
TestServer::HappyPath | TestServer::InvalidArgumentStatus => {
|
||||
vec![
|
||||
"00000000-0000-0000-0000-000000000002".to_string(),
|
||||
"00000000-0000-0000-0000-000000000003".to_string(),
|
||||
]
|
||||
}
|
||||
};
|
||||
Ok(Response::new(proto::ListThreadSpawnChildrenResponse {
|
||||
thread_ids,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn list_thread_spawn_descendants(
|
||||
&self,
|
||||
request: Request<proto::ListThreadSpawnDescendantsRequest>,
|
||||
) -> Result<Response<proto::ListThreadSpawnDescendantsResponse>, Status> {
|
||||
let request = request.into_inner();
|
||||
assert_eq!(
|
||||
request.root_thread_id,
|
||||
"00000000-0000-0000-0000-000000000001"
|
||||
);
|
||||
assert_eq!(request.status_filter, None);
|
||||
Ok(Response::new(proto::ListThreadSpawnDescendantsResponse {
|
||||
thread_ids: vec![
|
||||
"00000000-0000-0000-0000-000000000002".to_string(),
|
||||
"00000000-0000-0000-0000-000000000003".to_string(),
|
||||
"00000000-0000-0000-0000-000000000004".to_string(),
|
||||
],
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_store_calls_agent_graph_service() {
|
||||
let (store, shutdown) = serve_test_server(TestServer::HappyPath).await;
|
||||
let parent_thread_id = thread_id(1);
|
||||
let child_thread_id = thread_id(2);
|
||||
|
||||
store
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
child_thread_id,
|
||||
ThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await
|
||||
.expect("upsert should succeed");
|
||||
store
|
||||
.set_thread_spawn_edge_status(child_thread_id, ThreadSpawnEdgeStatus::Closed)
|
||||
.await
|
||||
.expect("status update should succeed");
|
||||
|
||||
let children = store
|
||||
.list_thread_spawn_children(parent_thread_id, Some(ThreadSpawnEdgeStatus::Open))
|
||||
.await
|
||||
.expect("children should load");
|
||||
assert_eq!(children, vec![thread_id(2), thread_id(3)]);
|
||||
|
||||
let descendants = store
|
||||
.list_thread_spawn_descendants(parent_thread_id, None)
|
||||
.await
|
||||
.expect("descendants should load");
|
||||
assert_eq!(descendants, vec![thread_id(2), thread_id(3), thread_id(4)]);
|
||||
|
||||
shutdown.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_store_maps_invalid_response_thread_id_to_invalid_request() {
|
||||
let (store, shutdown) = serve_test_server(TestServer::InvalidThreadIdResponse).await;
|
||||
|
||||
let err = store
|
||||
.list_thread_spawn_children(thread_id(1), Some(ThreadSpawnEdgeStatus::Open))
|
||||
.await
|
||||
.expect_err("invalid response thread id should fail");
|
||||
|
||||
assert!(matches!(
|
||||
err,
|
||||
AgentGraphStoreError::InvalidRequest { message } if message.contains("invalid child thread_id")
|
||||
));
|
||||
|
||||
shutdown.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_store_maps_invalid_argument_status_to_invalid_request() {
|
||||
let (store, shutdown) = serve_test_server(TestServer::InvalidArgumentStatus).await;
|
||||
|
||||
let err = store
|
||||
.upsert_thread_spawn_edge(thread_id(1), thread_id(2), ThreadSpawnEdgeStatus::Open)
|
||||
.await
|
||||
.expect_err("invalid argument should fail");
|
||||
|
||||
assert!(matches!(
|
||||
err,
|
||||
AgentGraphStoreError::InvalidRequest { message } if message == "status must be specified"
|
||||
));
|
||||
|
||||
shutdown.shutdown().await;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package codex.agent_graph_store.v1;
|
||||
|
||||
service AgentGraphStore {
|
||||
rpc UpsertThreadSpawnEdge(UpsertThreadSpawnEdgeRequest) returns (Empty);
|
||||
rpc SetThreadSpawnEdgeStatus(SetThreadSpawnEdgeStatusRequest) returns (Empty);
|
||||
rpc ListThreadSpawnChildren(ListThreadSpawnChildrenRequest) returns (ListThreadSpawnChildrenResponse);
|
||||
rpc ListThreadSpawnDescendants(ListThreadSpawnDescendantsRequest) returns (ListThreadSpawnDescendantsResponse);
|
||||
}
|
||||
|
||||
message Empty {}
|
||||
|
||||
message UpsertThreadSpawnEdgeRequest {
|
||||
string parent_thread_id = 1;
|
||||
string child_thread_id = 2;
|
||||
// Servers must reject THREAD_SPAWN_EDGE_STATUS_UNSPECIFIED.
|
||||
ThreadSpawnEdgeStatus status = 3;
|
||||
}
|
||||
|
||||
message SetThreadSpawnEdgeStatusRequest {
|
||||
string child_thread_id = 1;
|
||||
// Servers must reject THREAD_SPAWN_EDGE_STATUS_UNSPECIFIED.
|
||||
ThreadSpawnEdgeStatus status = 2;
|
||||
}
|
||||
|
||||
message ListThreadSpawnChildrenRequest {
|
||||
string parent_thread_id = 1;
|
||||
// Absent means all statuses.
|
||||
optional ThreadSpawnEdgeStatus status_filter = 2;
|
||||
}
|
||||
|
||||
message ListThreadSpawnChildrenResponse {
|
||||
// Ordered child thread ids.
|
||||
repeated string thread_ids = 1;
|
||||
}
|
||||
|
||||
message ListThreadSpawnDescendantsRequest {
|
||||
string root_thread_id = 1;
|
||||
// Absent means all statuses.
|
||||
optional ThreadSpawnEdgeStatus status_filter = 2;
|
||||
}
|
||||
|
||||
message ListThreadSpawnDescendantsResponse {
|
||||
// Ordered descendant thread ids.
|
||||
repeated string thread_ids = 1;
|
||||
}
|
||||
|
||||
enum ThreadSpawnEdgeStatus {
|
||||
THREAD_SPAWN_EDGE_STATUS_UNSPECIFIED = 0;
|
||||
THREAD_SPAWN_EDGE_STATUS_OPEN = 1;
|
||||
THREAD_SPAWN_EDGE_STATUS_CLOSED = 2;
|
||||
}
|
||||
@@ -0,0 +1,569 @@
|
||||
// This file is @generated by prost-build.
|
||||
#![allow(clippy::trivially_copy_pass_by_ref)]
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
|
||||
pub struct Empty {}
|
||||
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
|
||||
pub struct UpsertThreadSpawnEdgeRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
pub parent_thread_id: ::prost::alloc::string::String,
|
||||
#[prost(string, tag = "2")]
|
||||
pub child_thread_id: ::prost::alloc::string::String,
|
||||
/// Servers must reject THREAD_SPAWN_EDGE_STATUS_UNSPECIFIED.
|
||||
#[prost(enumeration = "ThreadSpawnEdgeStatus", tag = "3")]
|
||||
pub status: i32,
|
||||
}
|
||||
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
|
||||
pub struct SetThreadSpawnEdgeStatusRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
pub child_thread_id: ::prost::alloc::string::String,
|
||||
/// Servers must reject THREAD_SPAWN_EDGE_STATUS_UNSPECIFIED.
|
||||
#[prost(enumeration = "ThreadSpawnEdgeStatus", tag = "2")]
|
||||
pub status: i32,
|
||||
}
|
||||
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
|
||||
pub struct ListThreadSpawnChildrenRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
pub parent_thread_id: ::prost::alloc::string::String,
|
||||
/// Absent means all statuses.
|
||||
#[prost(enumeration = "ThreadSpawnEdgeStatus", optional, tag = "2")]
|
||||
pub status_filter: ::core::option::Option<i32>,
|
||||
}
|
||||
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
|
||||
pub struct ListThreadSpawnChildrenResponse {
|
||||
/// Ordered child thread ids.
|
||||
#[prost(string, repeated, tag = "1")]
|
||||
pub thread_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
|
||||
}
|
||||
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
|
||||
pub struct ListThreadSpawnDescendantsRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
pub root_thread_id: ::prost::alloc::string::String,
|
||||
/// Absent means all statuses.
|
||||
#[prost(enumeration = "ThreadSpawnEdgeStatus", optional, tag = "2")]
|
||||
pub status_filter: ::core::option::Option<i32>,
|
||||
}
|
||||
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
|
||||
pub struct ListThreadSpawnDescendantsResponse {
|
||||
/// Ordered descendant thread ids.
|
||||
#[prost(string, repeated, tag = "1")]
|
||||
pub thread_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
|
||||
}
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
|
||||
#[repr(i32)]
|
||||
pub enum ThreadSpawnEdgeStatus {
|
||||
Unspecified = 0,
|
||||
Open = 1,
|
||||
Closed = 2,
|
||||
}
|
||||
impl ThreadSpawnEdgeStatus {
|
||||
/// String value of the enum field names used in the ProtoBuf definition.
|
||||
///
|
||||
/// The values are not transformed in any way and thus are considered stable
|
||||
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
|
||||
pub fn as_str_name(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Unspecified => "THREAD_SPAWN_EDGE_STATUS_UNSPECIFIED",
|
||||
Self::Open => "THREAD_SPAWN_EDGE_STATUS_OPEN",
|
||||
Self::Closed => "THREAD_SPAWN_EDGE_STATUS_CLOSED",
|
||||
}
|
||||
}
|
||||
/// Creates an enum from field names used in the ProtoBuf definition.
|
||||
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
|
||||
match value {
|
||||
"THREAD_SPAWN_EDGE_STATUS_UNSPECIFIED" => Some(Self::Unspecified),
|
||||
"THREAD_SPAWN_EDGE_STATUS_OPEN" => Some(Self::Open),
|
||||
"THREAD_SPAWN_EDGE_STATUS_CLOSED" => Some(Self::Closed),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Generated client implementations.
|
||||
pub mod agent_graph_store_client {
|
||||
#![allow(
|
||||
unused_variables,
|
||||
dead_code,
|
||||
missing_docs,
|
||||
clippy::wildcard_imports,
|
||||
clippy::let_unit_value
|
||||
)]
|
||||
use tonic::codegen::http::Uri;
|
||||
use tonic::codegen::*;
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AgentGraphStoreClient<T> {
|
||||
inner: tonic::client::Grpc<T>,
|
||||
}
|
||||
impl AgentGraphStoreClient<tonic::transport::Channel> {
|
||||
/// Attempt to create a new client by connecting to a given endpoint.
|
||||
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
|
||||
where
|
||||
D: TryInto<tonic::transport::Endpoint>,
|
||||
D::Error: Into<StdError>,
|
||||
{
|
||||
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
|
||||
Ok(Self::new(conn))
|
||||
}
|
||||
}
|
||||
impl<T> AgentGraphStoreClient<T>
|
||||
where
|
||||
T: tonic::client::GrpcService<tonic::body::Body>,
|
||||
T::Error: Into<StdError>,
|
||||
T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
|
||||
<T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
|
||||
{
|
||||
pub fn new(inner: T) -> Self {
|
||||
let inner = tonic::client::Grpc::new(inner);
|
||||
Self { inner }
|
||||
}
|
||||
pub fn with_origin(inner: T, origin: Uri) -> Self {
|
||||
let inner = tonic::client::Grpc::with_origin(inner, origin);
|
||||
Self { inner }
|
||||
}
|
||||
pub fn with_interceptor<F>(
|
||||
inner: T,
|
||||
interceptor: F,
|
||||
) -> AgentGraphStoreClient<InterceptedService<T, F>>
|
||||
where
|
||||
F: tonic::service::Interceptor,
|
||||
T::ResponseBody: Default,
|
||||
T: tonic::codegen::Service<
|
||||
http::Request<tonic::body::Body>,
|
||||
Response = http::Response<
|
||||
<T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
|
||||
>,
|
||||
>,
|
||||
<T as tonic::codegen::Service<http::Request<tonic::body::Body>>>::Error:
|
||||
Into<StdError> + std::marker::Send + std::marker::Sync,
|
||||
{
|
||||
AgentGraphStoreClient::new(InterceptedService::new(inner, interceptor))
|
||||
}
|
||||
/// Compress requests with the given encoding.
|
||||
///
|
||||
/// This requires the server to support it otherwise it might respond with an
|
||||
/// error.
|
||||
#[must_use]
|
||||
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||
self.inner = self.inner.send_compressed(encoding);
|
||||
self
|
||||
}
|
||||
/// Enable decompressing responses.
|
||||
#[must_use]
|
||||
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||
self.inner = self.inner.accept_compressed(encoding);
|
||||
self
|
||||
}
|
||||
/// Limits the maximum size of a decoded message.
|
||||
///
|
||||
/// Default: `4MB`
|
||||
#[must_use]
|
||||
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
|
||||
self.inner = self.inner.max_decoding_message_size(limit);
|
||||
self
|
||||
}
|
||||
/// Limits the maximum size of an encoded message.
|
||||
///
|
||||
/// Default: `usize::MAX`
|
||||
#[must_use]
|
||||
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
|
||||
self.inner = self.inner.max_encoding_message_size(limit);
|
||||
self
|
||||
}
|
||||
pub async fn upsert_thread_spawn_edge(
|
||||
&mut self,
|
||||
request: impl tonic::IntoRequest<super::UpsertThreadSpawnEdgeRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::Empty>, tonic::Status> {
|
||||
self.inner.ready().await.map_err(|e| {
|
||||
tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
|
||||
})?;
|
||||
let codec = tonic_prost::ProstCodec::default();
|
||||
let path = http::uri::PathAndQuery::from_static(
|
||||
"/codex.agent_graph_store.v1.AgentGraphStore/UpsertThreadSpawnEdge",
|
||||
);
|
||||
let mut req = request.into_request();
|
||||
req.extensions_mut().insert(GrpcMethod::new(
|
||||
"codex.agent_graph_store.v1.AgentGraphStore",
|
||||
"UpsertThreadSpawnEdge",
|
||||
));
|
||||
self.inner.unary(req, path, codec).await
|
||||
}
|
||||
pub async fn set_thread_spawn_edge_status(
|
||||
&mut self,
|
||||
request: impl tonic::IntoRequest<super::SetThreadSpawnEdgeStatusRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::Empty>, tonic::Status> {
|
||||
self.inner.ready().await.map_err(|e| {
|
||||
tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
|
||||
})?;
|
||||
let codec = tonic_prost::ProstCodec::default();
|
||||
let path = http::uri::PathAndQuery::from_static(
|
||||
"/codex.agent_graph_store.v1.AgentGraphStore/SetThreadSpawnEdgeStatus",
|
||||
);
|
||||
let mut req = request.into_request();
|
||||
req.extensions_mut().insert(GrpcMethod::new(
|
||||
"codex.agent_graph_store.v1.AgentGraphStore",
|
||||
"SetThreadSpawnEdgeStatus",
|
||||
));
|
||||
self.inner.unary(req, path, codec).await
|
||||
}
|
||||
pub async fn list_thread_spawn_children(
|
||||
&mut self,
|
||||
request: impl tonic::IntoRequest<super::ListThreadSpawnChildrenRequest>,
|
||||
) -> std::result::Result<
|
||||
tonic::Response<super::ListThreadSpawnChildrenResponse>,
|
||||
tonic::Status,
|
||||
> {
|
||||
self.inner.ready().await.map_err(|e| {
|
||||
tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
|
||||
})?;
|
||||
let codec = tonic_prost::ProstCodec::default();
|
||||
let path = http::uri::PathAndQuery::from_static(
|
||||
"/codex.agent_graph_store.v1.AgentGraphStore/ListThreadSpawnChildren",
|
||||
);
|
||||
let mut req = request.into_request();
|
||||
req.extensions_mut().insert(GrpcMethod::new(
|
||||
"codex.agent_graph_store.v1.AgentGraphStore",
|
||||
"ListThreadSpawnChildren",
|
||||
));
|
||||
self.inner.unary(req, path, codec).await
|
||||
}
|
||||
pub async fn list_thread_spawn_descendants(
|
||||
&mut self,
|
||||
request: impl tonic::IntoRequest<super::ListThreadSpawnDescendantsRequest>,
|
||||
) -> std::result::Result<
|
||||
tonic::Response<super::ListThreadSpawnDescendantsResponse>,
|
||||
tonic::Status,
|
||||
> {
|
||||
self.inner.ready().await.map_err(|e| {
|
||||
tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
|
||||
})?;
|
||||
let codec = tonic_prost::ProstCodec::default();
|
||||
let path = http::uri::PathAndQuery::from_static(
|
||||
"/codex.agent_graph_store.v1.AgentGraphStore/ListThreadSpawnDescendants",
|
||||
);
|
||||
let mut req = request.into_request();
|
||||
req.extensions_mut().insert(GrpcMethod::new(
|
||||
"codex.agent_graph_store.v1.AgentGraphStore",
|
||||
"ListThreadSpawnDescendants",
|
||||
));
|
||||
self.inner.unary(req, path, codec).await
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Generated server implementations.
|
||||
pub mod agent_graph_store_server {
|
||||
#![allow(
|
||||
unused_variables,
|
||||
dead_code,
|
||||
missing_docs,
|
||||
clippy::wildcard_imports,
|
||||
clippy::let_unit_value
|
||||
)]
|
||||
use tonic::codegen::*;
|
||||
/// Generated trait containing gRPC methods that should be implemented for use with AgentGraphStoreServer.
|
||||
#[async_trait]
|
||||
pub trait AgentGraphStore: std::marker::Send + std::marker::Sync + 'static {
|
||||
async fn upsert_thread_spawn_edge(
|
||||
&self,
|
||||
request: tonic::Request<super::UpsertThreadSpawnEdgeRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::Empty>, tonic::Status>;
|
||||
async fn set_thread_spawn_edge_status(
|
||||
&self,
|
||||
request: tonic::Request<super::SetThreadSpawnEdgeStatusRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::Empty>, tonic::Status>;
|
||||
async fn list_thread_spawn_children(
|
||||
&self,
|
||||
request: tonic::Request<super::ListThreadSpawnChildrenRequest>,
|
||||
) -> std::result::Result<
|
||||
tonic::Response<super::ListThreadSpawnChildrenResponse>,
|
||||
tonic::Status,
|
||||
>;
|
||||
async fn list_thread_spawn_descendants(
|
||||
&self,
|
||||
request: tonic::Request<super::ListThreadSpawnDescendantsRequest>,
|
||||
) -> std::result::Result<
|
||||
tonic::Response<super::ListThreadSpawnDescendantsResponse>,
|
||||
tonic::Status,
|
||||
>;
|
||||
}
|
||||
#[derive(Debug)]
|
||||
pub struct AgentGraphStoreServer<T> {
|
||||
inner: Arc<T>,
|
||||
accept_compression_encodings: EnabledCompressionEncodings,
|
||||
send_compression_encodings: EnabledCompressionEncodings,
|
||||
max_decoding_message_size: Option<usize>,
|
||||
max_encoding_message_size: Option<usize>,
|
||||
}
|
||||
impl<T> AgentGraphStoreServer<T> {
|
||||
pub fn new(inner: T) -> Self {
|
||||
Self::from_arc(Arc::new(inner))
|
||||
}
|
||||
pub fn from_arc(inner: Arc<T>) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
accept_compression_encodings: Default::default(),
|
||||
send_compression_encodings: Default::default(),
|
||||
max_decoding_message_size: None,
|
||||
max_encoding_message_size: None,
|
||||
}
|
||||
}
|
||||
pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F>
|
||||
where
|
||||
F: tonic::service::Interceptor,
|
||||
{
|
||||
InterceptedService::new(Self::new(inner), interceptor)
|
||||
}
|
||||
/// Enable decompressing requests with the given encoding.
|
||||
#[must_use]
|
||||
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||
self.accept_compression_encodings.enable(encoding);
|
||||
self
|
||||
}
|
||||
/// Compress responses with the given encoding, if the client supports it.
|
||||
#[must_use]
|
||||
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||
self.send_compression_encodings.enable(encoding);
|
||||
self
|
||||
}
|
||||
/// Limits the maximum size of a decoded message.
|
||||
///
|
||||
/// Default: `4MB`
|
||||
#[must_use]
|
||||
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
|
||||
self.max_decoding_message_size = Some(limit);
|
||||
self
|
||||
}
|
||||
/// Limits the maximum size of an encoded message.
|
||||
///
|
||||
/// Default: `usize::MAX`
|
||||
#[must_use]
|
||||
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
|
||||
self.max_encoding_message_size = Some(limit);
|
||||
self
|
||||
}
|
||||
}
|
||||
impl<T, B> tonic::codegen::Service<http::Request<B>> for AgentGraphStoreServer<T>
|
||||
where
|
||||
T: AgentGraphStore,
|
||||
B: Body + std::marker::Send + 'static,
|
||||
B::Error: Into<StdError> + std::marker::Send + 'static,
|
||||
{
|
||||
type Response = http::Response<tonic::body::Body>;
|
||||
type Error = std::convert::Infallible;
|
||||
type Future = BoxFuture<Self::Response, Self::Error>;
|
||||
fn poll_ready(
|
||||
&mut self,
|
||||
_cx: &mut Context<'_>,
|
||||
) -> Poll<std::result::Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
fn call(&mut self, req: http::Request<B>) -> Self::Future {
|
||||
match req.uri().path() {
|
||||
"/codex.agent_graph_store.v1.AgentGraphStore/UpsertThreadSpawnEdge" => {
|
||||
#[allow(non_camel_case_types)]
|
||||
struct UpsertThreadSpawnEdgeSvc<T: AgentGraphStore>(pub Arc<T>);
|
||||
impl<T: AgentGraphStore>
|
||||
tonic::server::UnaryService<super::UpsertThreadSpawnEdgeRequest>
|
||||
for UpsertThreadSpawnEdgeSvc<T>
|
||||
{
|
||||
type Response = super::Empty;
|
||||
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
|
||||
fn call(
|
||||
&mut self,
|
||||
request: tonic::Request<super::UpsertThreadSpawnEdgeRequest>,
|
||||
) -> Self::Future {
|
||||
let inner = Arc::clone(&self.0);
|
||||
let fut = async move {
|
||||
<T as AgentGraphStore>::upsert_thread_spawn_edge(&inner, request)
|
||||
.await
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
}
|
||||
let accept_compression_encodings = self.accept_compression_encodings;
|
||||
let send_compression_encodings = self.send_compression_encodings;
|
||||
let max_decoding_message_size = self.max_decoding_message_size;
|
||||
let max_encoding_message_size = self.max_encoding_message_size;
|
||||
let inner = self.inner.clone();
|
||||
let fut = async move {
|
||||
let method = UpsertThreadSpawnEdgeSvc(inner);
|
||||
let codec = tonic_prost::ProstCodec::default();
|
||||
let mut grpc = tonic::server::Grpc::new(codec)
|
||||
.apply_compression_config(
|
||||
accept_compression_encodings,
|
||||
send_compression_encodings,
|
||||
)
|
||||
.apply_max_message_size_config(
|
||||
max_decoding_message_size,
|
||||
max_encoding_message_size,
|
||||
);
|
||||
let res = grpc.unary(method, req).await;
|
||||
Ok(res)
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
"/codex.agent_graph_store.v1.AgentGraphStore/SetThreadSpawnEdgeStatus" => {
|
||||
#[allow(non_camel_case_types)]
|
||||
struct SetThreadSpawnEdgeStatusSvc<T: AgentGraphStore>(pub Arc<T>);
|
||||
impl<T: AgentGraphStore>
|
||||
tonic::server::UnaryService<super::SetThreadSpawnEdgeStatusRequest>
|
||||
for SetThreadSpawnEdgeStatusSvc<T>
|
||||
{
|
||||
type Response = super::Empty;
|
||||
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
|
||||
fn call(
|
||||
&mut self,
|
||||
request: tonic::Request<super::SetThreadSpawnEdgeStatusRequest>,
|
||||
) -> Self::Future {
|
||||
let inner = Arc::clone(&self.0);
|
||||
let fut = async move {
|
||||
<T as AgentGraphStore>::set_thread_spawn_edge_status(
|
||||
&inner, request,
|
||||
)
|
||||
.await
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
}
|
||||
let accept_compression_encodings = self.accept_compression_encodings;
|
||||
let send_compression_encodings = self.send_compression_encodings;
|
||||
let max_decoding_message_size = self.max_decoding_message_size;
|
||||
let max_encoding_message_size = self.max_encoding_message_size;
|
||||
let inner = self.inner.clone();
|
||||
let fut = async move {
|
||||
let method = SetThreadSpawnEdgeStatusSvc(inner);
|
||||
let codec = tonic_prost::ProstCodec::default();
|
||||
let mut grpc = tonic::server::Grpc::new(codec)
|
||||
.apply_compression_config(
|
||||
accept_compression_encodings,
|
||||
send_compression_encodings,
|
||||
)
|
||||
.apply_max_message_size_config(
|
||||
max_decoding_message_size,
|
||||
max_encoding_message_size,
|
||||
);
|
||||
let res = grpc.unary(method, req).await;
|
||||
Ok(res)
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
"/codex.agent_graph_store.v1.AgentGraphStore/ListThreadSpawnChildren" => {
|
||||
#[allow(non_camel_case_types)]
|
||||
struct ListThreadSpawnChildrenSvc<T: AgentGraphStore>(pub Arc<T>);
|
||||
impl<T: AgentGraphStore>
|
||||
tonic::server::UnaryService<super::ListThreadSpawnChildrenRequest>
|
||||
for ListThreadSpawnChildrenSvc<T>
|
||||
{
|
||||
type Response = super::ListThreadSpawnChildrenResponse;
|
||||
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
|
||||
fn call(
|
||||
&mut self,
|
||||
request: tonic::Request<super::ListThreadSpawnChildrenRequest>,
|
||||
) -> Self::Future {
|
||||
let inner = Arc::clone(&self.0);
|
||||
let fut = async move {
|
||||
<T as AgentGraphStore>::list_thread_spawn_children(&inner, request)
|
||||
.await
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
}
|
||||
let accept_compression_encodings = self.accept_compression_encodings;
|
||||
let send_compression_encodings = self.send_compression_encodings;
|
||||
let max_decoding_message_size = self.max_decoding_message_size;
|
||||
let max_encoding_message_size = self.max_encoding_message_size;
|
||||
let inner = self.inner.clone();
|
||||
let fut = async move {
|
||||
let method = ListThreadSpawnChildrenSvc(inner);
|
||||
let codec = tonic_prost::ProstCodec::default();
|
||||
let mut grpc = tonic::server::Grpc::new(codec)
|
||||
.apply_compression_config(
|
||||
accept_compression_encodings,
|
||||
send_compression_encodings,
|
||||
)
|
||||
.apply_max_message_size_config(
|
||||
max_decoding_message_size,
|
||||
max_encoding_message_size,
|
||||
);
|
||||
let res = grpc.unary(method, req).await;
|
||||
Ok(res)
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
"/codex.agent_graph_store.v1.AgentGraphStore/ListThreadSpawnDescendants" => {
|
||||
#[allow(non_camel_case_types)]
|
||||
struct ListThreadSpawnDescendantsSvc<T: AgentGraphStore>(pub Arc<T>);
|
||||
impl<T: AgentGraphStore>
|
||||
tonic::server::UnaryService<super::ListThreadSpawnDescendantsRequest>
|
||||
for ListThreadSpawnDescendantsSvc<T>
|
||||
{
|
||||
type Response = super::ListThreadSpawnDescendantsResponse;
|
||||
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
|
||||
fn call(
|
||||
&mut self,
|
||||
request: tonic::Request<super::ListThreadSpawnDescendantsRequest>,
|
||||
) -> Self::Future {
|
||||
let inner = Arc::clone(&self.0);
|
||||
let fut = async move {
|
||||
<T as AgentGraphStore>::list_thread_spawn_descendants(
|
||||
&inner, request,
|
||||
)
|
||||
.await
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
}
|
||||
let accept_compression_encodings = self.accept_compression_encodings;
|
||||
let send_compression_encodings = self.send_compression_encodings;
|
||||
let max_decoding_message_size = self.max_decoding_message_size;
|
||||
let max_encoding_message_size = self.max_encoding_message_size;
|
||||
let inner = self.inner.clone();
|
||||
let fut = async move {
|
||||
let method = ListThreadSpawnDescendantsSvc(inner);
|
||||
let codec = tonic_prost::ProstCodec::default();
|
||||
let mut grpc = tonic::server::Grpc::new(codec)
|
||||
.apply_compression_config(
|
||||
accept_compression_encodings,
|
||||
send_compression_encodings,
|
||||
)
|
||||
.apply_max_message_size_config(
|
||||
max_decoding_message_size,
|
||||
max_encoding_message_size,
|
||||
);
|
||||
let res = grpc.unary(method, req).await;
|
||||
Ok(res)
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
_ => Box::pin(async move {
|
||||
let mut response = http::Response::new(tonic::body::Body::default());
|
||||
let headers = response.headers_mut();
|
||||
headers.insert(
|
||||
tonic::Status::GRPC_STATUS,
|
||||
(tonic::Code::Unimplemented as i32).into(),
|
||||
);
|
||||
headers.insert(
|
||||
http::header::CONTENT_TYPE,
|
||||
tonic::metadata::GRPC_CONTENT_TYPE,
|
||||
);
|
||||
Ok(response)
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<T> Clone for AgentGraphStoreServer<T> {
|
||||
fn clone(&self) -> Self {
|
||||
let inner = self.inner.clone();
|
||||
Self {
|
||||
inner,
|
||||
accept_compression_encodings: self.accept_compression_encodings,
|
||||
send_compression_encodings: self.send_compression_encodings,
|
||||
max_decoding_message_size: self.max_decoding_message_size,
|
||||
max_encoding_message_size: self.max_encoding_message_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Generated gRPC service name
|
||||
pub const SERVICE_NAME: &str = "codex.agent_graph_store.v1.AgentGraphStore";
|
||||
impl<T> tonic::server::NamedService for AgentGraphStoreServer<T> {
|
||||
const NAME: &'static str = SERVICE_NAME;
|
||||
}
|
||||
}
|
||||
55
codex-rs/agent-graph-store/src/store.rs
Normal file
55
codex-rs/agent-graph-store/src/store.rs
Normal file
@@ -0,0 +1,55 @@
|
||||
use async_trait::async_trait;
|
||||
use codex_protocol::ThreadId;
|
||||
|
||||
use crate::AgentGraphStoreResult;
|
||||
use crate::ThreadSpawnEdgeStatus;
|
||||
|
||||
/// Storage-neutral boundary for persisted thread-spawn parent/child topology.
|
||||
///
|
||||
/// Implementations are expected to return stable ordering for list methods so callers can merge
|
||||
/// persisted graph state with live in-memory state without introducing nondeterministic output.
|
||||
#[async_trait]
|
||||
pub trait AgentGraphStore: Send + Sync {
|
||||
/// Insert or replace the directional parent/child edge for a spawned thread.
|
||||
///
|
||||
/// `child_thread_id` has at most one persisted parent. Re-inserting the same child should
|
||||
/// update both the parent and status to match the supplied values.
|
||||
async fn upsert_thread_spawn_edge(
|
||||
&self,
|
||||
parent_thread_id: ThreadId,
|
||||
child_thread_id: ThreadId,
|
||||
status: ThreadSpawnEdgeStatus,
|
||||
) -> AgentGraphStoreResult<()>;
|
||||
|
||||
/// Update the persisted lifecycle status of a spawned thread's incoming edge.
|
||||
///
|
||||
/// Implementations should treat missing children as a successful no-op.
|
||||
async fn set_thread_spawn_edge_status(
|
||||
&self,
|
||||
child_thread_id: ThreadId,
|
||||
status: ThreadSpawnEdgeStatus,
|
||||
) -> AgentGraphStoreResult<()>;
|
||||
|
||||
/// List direct spawned children of a parent thread.
|
||||
///
|
||||
/// When `status_filter` is `Some`, only child edges with that exact status are returned. When
|
||||
/// it is `None`, all direct child edges are returned regardless of status, including statuses
|
||||
/// that may be added by a future store implementation.
|
||||
async fn list_thread_spawn_children(
|
||||
&self,
|
||||
parent_thread_id: ThreadId,
|
||||
status_filter: Option<ThreadSpawnEdgeStatus>,
|
||||
) -> AgentGraphStoreResult<Vec<ThreadId>>;
|
||||
|
||||
/// List spawned descendants breadth-first by depth, then by thread id.
|
||||
///
|
||||
/// `status_filter` is applied to every traversed edge, not just to the returned descendants.
|
||||
/// For example, `Some(Open)` walks only open edges, so descendants under a closed edge are not
|
||||
/// included even if their own incoming edge is open. `None` walks and returns every persisted
|
||||
/// edge regardless of status.
|
||||
async fn list_thread_spawn_descendants(
|
||||
&self,
|
||||
root_thread_id: ThreadId,
|
||||
status_filter: Option<ThreadSpawnEdgeStatus>,
|
||||
) -> AgentGraphStoreResult<Vec<ThreadId>>;
|
||||
}
|
||||
42
codex-rs/agent-graph-store/src/types.rs
Normal file
42
codex-rs/agent-graph-store/src/types.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
/// Lifecycle status attached to a directional thread-spawn edge.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ThreadSpawnEdgeStatus {
|
||||
/// The child thread is still live or resumable as an open spawned agent.
|
||||
Open,
|
||||
/// The child thread has been closed from the parent/child graph's perspective.
|
||||
Closed,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn thread_spawn_edge_status_serializes_as_snake_case() {
|
||||
assert_eq!(
|
||||
serde_json::to_string(&ThreadSpawnEdgeStatus::Open)
|
||||
.expect("open status should serialize"),
|
||||
"\"open\""
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::to_string(&ThreadSpawnEdgeStatus::Closed)
|
||||
.expect("closed status should serialize"),
|
||||
"\"closed\""
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::from_str::<ThreadSpawnEdgeStatus>("\"open\"")
|
||||
.expect("open status should deserialize"),
|
||||
ThreadSpawnEdgeStatus::Open
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::from_str::<ThreadSpawnEdgeStatus>("\"closed\"")
|
||||
.expect("closed status should deserialize"),
|
||||
ThreadSpawnEdgeStatus::Closed
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -134,6 +134,15 @@ ON CONFLICT(child_thread_id) DO UPDATE SET
|
||||
.await
|
||||
}
|
||||
|
||||
/// List all direct spawned children of `parent_thread_id`.
|
||||
pub async fn list_thread_spawn_children(
|
||||
&self,
|
||||
parent_thread_id: ThreadId,
|
||||
) -> anyhow::Result<Vec<ThreadId>> {
|
||||
self.list_thread_spawn_children_matching(parent_thread_id, /*status*/ None)
|
||||
.await
|
||||
}
|
||||
|
||||
/// List spawned descendants of `root_thread_id` whose edges match `status`.
|
||||
///
|
||||
/// Descendants are returned breadth-first by depth, then by thread id for stable ordering.
|
||||
@@ -1871,4 +1880,65 @@ mod tests {
|
||||
.expect("all descendants should load");
|
||||
assert_eq!(all_descendants, vec![child_thread_id, grandchild_thread_id]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_spawn_children_without_status_filter_lists_all_statuses() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home, "test-provider".to_string())
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
let parent_thread_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000910").expect("valid thread id");
|
||||
let open_child_thread_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000911").expect("valid thread id");
|
||||
let closed_child_thread_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000912").expect("valid thread id");
|
||||
let future_child_thread_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000913").expect("valid thread id");
|
||||
|
||||
runtime
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
open_child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await
|
||||
.expect("open child edge insert should succeed");
|
||||
runtime
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
closed_child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await
|
||||
.expect("closed child edge insert should succeed");
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO thread_spawn_edges (
|
||||
parent_thread_id,
|
||||
child_thread_id,
|
||||
status
|
||||
) VALUES (?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(parent_thread_id.to_string())
|
||||
.bind(future_child_thread_id.to_string())
|
||||
.bind("future")
|
||||
.execute(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("future-status child edge insert should succeed");
|
||||
|
||||
let children = runtime
|
||||
.list_thread_spawn_children(parent_thread_id)
|
||||
.await
|
||||
.expect("all children should load");
|
||||
assert_eq!(
|
||||
children,
|
||||
vec![
|
||||
open_child_thread_id,
|
||||
closed_child_thread_id,
|
||||
future_child_thread_id,
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user