mirror of
https://github.com/openai/codex.git
synced 2026-02-26 02:33:48 +00:00
Compare commits
29 Commits
dev/cc/new
...
shareable-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
18a35e1d50 | ||
|
|
06862a36ea | ||
|
|
f022b4dcdd | ||
|
|
de87a5a7ad | ||
|
|
4525c785d0 | ||
|
|
9df71691d2 | ||
|
|
12f7ffdb66 | ||
|
|
27b5e7c4be | ||
|
|
21db430ab3 | ||
|
|
4db264bb7d | ||
|
|
37a68eeb05 | ||
|
|
e2e83dce09 | ||
|
|
68730de96d | ||
|
|
726369eaa6 | ||
|
|
9c6cc81ef4 | ||
|
|
ced5d77ad2 | ||
|
|
05cae8a1ec | ||
|
|
ffa7644f80 | ||
|
|
8d542f890f | ||
|
|
23855a07f3 | ||
|
|
99f190bf91 | ||
|
|
0c2c47aa29 | ||
|
|
0f3c89d02b | ||
|
|
cd265a3093 | ||
|
|
275a6b7300 | ||
|
|
45ce36de1f | ||
|
|
16cfee1935 | ||
|
|
f4c955af57 | ||
|
|
4e199e6aeb |
176
MODULE.bazel.lock
generated
176
MODULE.bazel.lock
generated
File diff suppressed because one or more lines are too long
1037
codex-rs/Cargo.lock
generated
1037
codex-rs/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -152,6 +152,8 @@ async-channel = "2.3.1"
|
||||
async-stream = "0.3.6"
|
||||
async-trait = "0.1.89"
|
||||
axum = { version = "0.8", default-features = false }
|
||||
azure_core = "0.21"
|
||||
azure_identity = "0.21"
|
||||
base64 = "0.22.1"
|
||||
bm25 = "2.3.2"
|
||||
bytes = "1.10.1"
|
||||
|
||||
@@ -23,6 +23,8 @@ arc-swap = "1.8.0"
|
||||
async-channel = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
askama = { workspace = true }
|
||||
azure_core = { workspace = true }
|
||||
azure_identity = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
bm25 = { workspace = true }
|
||||
chardetng = { workspace = true }
|
||||
@@ -102,7 +104,7 @@ tokio = { workspace = true, features = [
|
||||
"rt-multi-thread",
|
||||
"signal",
|
||||
] }
|
||||
tokio-util = { workspace = true, features = ["rt"] }
|
||||
tokio-util = { workspace = true, features = ["io", "rt"] }
|
||||
tokio-tungstenite = { workspace = true }
|
||||
toml = { workspace = true }
|
||||
toml_edit = { workspace = true }
|
||||
|
||||
@@ -1685,6 +1685,17 @@
|
||||
],
|
||||
"description": "Sandbox configuration to apply if `sandbox` is `WorkspaceWrite`."
|
||||
},
|
||||
"session_object_storage_url": {
|
||||
"description": "Base URL for the object store used by /share (enterprise/self-hosted).",
|
||||
"type": "string"
|
||||
},
|
||||
"session_object_storage_url_cmd": {
|
||||
"description": "Optional command to produce the base URL for the object store used by /share. The command runs as argv and must print a non-empty URL to stdout.",
|
||||
"items": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"shell_environment_policy": {
|
||||
"allOf": [
|
||||
{
|
||||
|
||||
@@ -78,6 +78,8 @@ use std::collections::HashMap;
|
||||
use std::io::ErrorKind;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
use std::time::Duration;
|
||||
#[cfg(test)]
|
||||
use tempfile::tempdir;
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
@@ -109,6 +111,7 @@ pub use codex_git::GhostSnapshotConfig;
|
||||
/// the context window.
|
||||
pub(crate) const PROJECT_DOC_MAX_BYTES: usize = 32 * 1024; // 32 KiB
|
||||
pub(crate) const DEFAULT_AGENT_MAX_THREADS: Option<usize> = Some(6);
|
||||
const SESSION_OBJECT_STORAGE_URL_CMD_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
pub const CONFIG_TOML_FILE: &str = "config.toml";
|
||||
|
||||
@@ -346,6 +349,11 @@ pub struct Config {
|
||||
/// Base URL for requests to ChatGPT (as opposed to the OpenAI API).
|
||||
pub chatgpt_base_url: String,
|
||||
|
||||
/// Optional base URL for storing shared session rollouts in an object store.
|
||||
pub session_object_storage_url: Option<String>,
|
||||
/// Optional command to produce the base URL for storing shared session rollouts.
|
||||
pub session_object_storage_url_cmd: Option<Vec<String>>,
|
||||
|
||||
/// When set, restricts ChatGPT login to a specific workspace identifier.
|
||||
pub forced_chatgpt_workspace_id: Option<String>,
|
||||
|
||||
@@ -947,6 +955,12 @@ pub struct ConfigToml {
|
||||
/// When unset, Codex will bind to an ephemeral port chosen by the OS.
|
||||
pub mcp_oauth_callback_port: Option<u16>,
|
||||
|
||||
/// Base URL for the object store used by /share (enterprise/self-hosted).
|
||||
pub session_object_storage_url: Option<String>,
|
||||
/// Optional command to produce the base URL for the object store used by /share.
|
||||
/// The command runs as argv and must print a non-empty URL to stdout.
|
||||
pub session_object_storage_url_cmd: Option<Vec<String>>,
|
||||
|
||||
/// User-defined provider entries that extend/override the built-in list.
|
||||
#[serde(default)]
|
||||
pub model_providers: HashMap<String, ModelProviderInfo>,
|
||||
@@ -1508,6 +1522,15 @@ impl Config {
|
||||
Some(WindowsSandboxModeToml::Unelevated) => WindowsSandboxLevel::RestrictedToken,
|
||||
None => WindowsSandboxLevel::from_features(&features),
|
||||
};
|
||||
let session_object_storage_url =
|
||||
cfg.session_object_storage_url.as_ref().and_then(|value| {
|
||||
let trimmed = value.trim();
|
||||
if trimmed.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(trimmed.to_string())
|
||||
}
|
||||
});
|
||||
let mut sandbox_policy = cfg.derive_sandbox_policy(
|
||||
sandbox_mode,
|
||||
config_profile.sandbox_mode,
|
||||
@@ -1815,6 +1838,8 @@ impl Config {
|
||||
.chatgpt_base_url
|
||||
.or(cfg.chatgpt_base_url)
|
||||
.unwrap_or("https://chatgpt.com/backend-api/".to_string()),
|
||||
session_object_storage_url,
|
||||
session_object_storage_url_cmd: cfg.session_object_storage_url_cmd,
|
||||
forced_chatgpt_workspace_id,
|
||||
forced_login_method,
|
||||
include_apply_patch_tool: include_apply_patch_tool_flag,
|
||||
@@ -1881,6 +1906,35 @@ impl Config {
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
pub async fn resolve_session_object_storage_url(&self) -> std::io::Result<Option<String>> {
|
||||
if let Some(url) = self.session_object_storage_url.as_ref() {
|
||||
return Ok(Some(url.clone()));
|
||||
}
|
||||
|
||||
let Some(command) = self.session_object_storage_url_cmd.clone() else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let mut handle = tokio::task::spawn_blocking(move || {
|
||||
Self::try_run_command_for_value(Some(&command), "session_object_storage_url_cmd")
|
||||
});
|
||||
match tokio::time::timeout(SESSION_OBJECT_STORAGE_URL_CMD_TIMEOUT, &mut handle).await {
|
||||
Ok(result) => result.map_err(|err| {
|
||||
std::io::Error::other(format!("session_object_storage_url_cmd panicked: {err}"))
|
||||
})?,
|
||||
Err(_) => {
|
||||
handle.abort();
|
||||
Err(std::io::Error::new(
|
||||
ErrorKind::TimedOut,
|
||||
format!(
|
||||
"session_object_storage_url_cmd timed out after {}s",
|
||||
SESSION_OBJECT_STORAGE_URL_CMD_TIMEOUT.as_secs()
|
||||
),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn load_instructions(codex_dir: Option<&Path>) -> Option<String> {
|
||||
let base = codex_dir?;
|
||||
for candidate in [LOCAL_PROJECT_DOC_FILENAME, DEFAULT_PROJECT_DOC_FILENAME] {
|
||||
@@ -1925,6 +1979,59 @@ impl Config {
|
||||
}
|
||||
}
|
||||
|
||||
fn try_run_command_for_value(
|
||||
command: Option<&[String]>,
|
||||
context: &str,
|
||||
) -> std::io::Result<Option<String>> {
|
||||
let Some(command) = command else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if command.is_empty() {
|
||||
return Err(std::io::Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("{context} is empty"),
|
||||
));
|
||||
}
|
||||
|
||||
let mut cmd = Command::new(&command[0]);
|
||||
if let Some(args) = command.get(1..) {
|
||||
cmd.args(args);
|
||||
}
|
||||
|
||||
let output = cmd.output().map_err(|err| {
|
||||
std::io::Error::new(err.kind(), format!("{context} failed to start: {err}"))
|
||||
})?;
|
||||
|
||||
if !output.status.success() {
|
||||
let status = output.status;
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
let stderr = stderr.trim();
|
||||
let message = if stderr.is_empty() {
|
||||
format!("{context} failed with status {status}")
|
||||
} else {
|
||||
format!("{context} failed with status {status}: {stderr}")
|
||||
};
|
||||
return Err(std::io::Error::other(message));
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8(output.stdout).map_err(|err| {
|
||||
std::io::Error::new(
|
||||
ErrorKind::InvalidData,
|
||||
format!("{context} output was not UTF-8: {err}"),
|
||||
)
|
||||
})?;
|
||||
let trimmed = stdout.trim();
|
||||
if trimmed.is_empty() {
|
||||
Err(std::io::Error::new(
|
||||
ErrorKind::InvalidData,
|
||||
format!("{context} returned empty output"),
|
||||
))
|
||||
} else {
|
||||
Ok(Some(trimmed.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_windows_sandbox_enabled(&mut self, value: bool) {
|
||||
self.permissions.windows_sandbox_mode = if value {
|
||||
Some(WindowsSandboxModeToml::Unelevated)
|
||||
@@ -4133,6 +4240,8 @@ model_verbosity = "high"
|
||||
model_verbosity: None,
|
||||
personality: Some(Personality::Pragmatic),
|
||||
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
|
||||
session_object_storage_url: None,
|
||||
session_object_storage_url_cmd: None,
|
||||
base_instructions: None,
|
||||
developer_instructions: None,
|
||||
compact_prompt: None,
|
||||
@@ -4244,6 +4353,8 @@ model_verbosity = "high"
|
||||
model_verbosity: None,
|
||||
personality: Some(Personality::Pragmatic),
|
||||
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
|
||||
session_object_storage_url: None,
|
||||
session_object_storage_url_cmd: None,
|
||||
base_instructions: None,
|
||||
developer_instructions: None,
|
||||
compact_prompt: None,
|
||||
@@ -4353,6 +4464,8 @@ model_verbosity = "high"
|
||||
model_verbosity: None,
|
||||
personality: Some(Personality::Pragmatic),
|
||||
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
|
||||
session_object_storage_url: None,
|
||||
session_object_storage_url_cmd: None,
|
||||
base_instructions: None,
|
||||
developer_instructions: None,
|
||||
compact_prompt: None,
|
||||
@@ -4448,6 +4561,8 @@ model_verbosity = "high"
|
||||
model_verbosity: Some(Verbosity::High),
|
||||
personality: Some(Personality::Pragmatic),
|
||||
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
|
||||
session_object_storage_url: None,
|
||||
session_object_storage_url_cmd: None,
|
||||
base_instructions: None,
|
||||
developer_instructions: None,
|
||||
compact_prompt: None,
|
||||
|
||||
@@ -57,6 +57,7 @@ mod proposed_plan_parser;
|
||||
mod sandbox_tags;
|
||||
pub mod sandboxing;
|
||||
mod session_prefix;
|
||||
pub mod session_share;
|
||||
mod shell_detect;
|
||||
mod stream_events_utils;
|
||||
mod tagged_block_parser;
|
||||
|
||||
882
codex-rs/core/src/session_share.rs
Normal file
882
codex-rs/core/src/session_share.rs
Normal file
@@ -0,0 +1,882 @@
|
||||
use std::ffi::OsStr;
|
||||
use std::ffi::OsString;
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use azure_core::auth::TokenCredential;
|
||||
use azure_identity::AzureCliCredential;
|
||||
use codex_protocol::ThreadId;
|
||||
use futures::StreamExt;
|
||||
use reqwest::StatusCode;
|
||||
use reqwest::Url;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use time::OffsetDateTime;
|
||||
use time::format_description::FormatItem;
|
||||
use time::macros::format_description;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
const SHARE_OBJECT_PREFIX: &str = "sessions";
|
||||
const SHARE_OBJECT_SUFFIX: &str = ".jsonl";
|
||||
const SHARE_META_SUFFIX: &str = ".meta.json";
|
||||
const AZURE_STORAGE_SCOPE: &str = "https://storage.azure.com/.default";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SessionShareResult {
|
||||
pub remote_id: ThreadId,
|
||||
pub object_url: Url,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum SessionObjectStore {
|
||||
Http(HttpObjectStore),
|
||||
Azure(AzureObjectStore),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct HttpObjectStore {
|
||||
base_url: Url,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct AzureObjectStore {
|
||||
endpoint: Url,
|
||||
container: String,
|
||||
prefix: String,
|
||||
sas_query: Option<String>,
|
||||
client: reqwest::Client,
|
||||
credential: Option<Arc<dyn TokenCredential>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct SessionShareMeta {
|
||||
owner: String,
|
||||
created_at: i64,
|
||||
updated_at: i64,
|
||||
}
|
||||
|
||||
impl SessionObjectStore {
|
||||
pub async fn new(base_url: &str) -> anyhow::Result<Self> {
|
||||
let mut url = Url::parse(base_url)
|
||||
.with_context(|| format!("invalid session_object_storage_url: {base_url}"))?;
|
||||
match url.scheme() {
|
||||
"az" => Ok(SessionObjectStore::Azure(AzureObjectStore::new_from_az(
|
||||
&url,
|
||||
)?)),
|
||||
"http" | "https" => {
|
||||
if is_azure_blob_url(&url) {
|
||||
Ok(SessionObjectStore::Azure(AzureObjectStore::new(&url)?))
|
||||
} else {
|
||||
ensure_trailing_slash(&mut url);
|
||||
Ok(SessionObjectStore::Http(HttpObjectStore {
|
||||
base_url: url,
|
||||
client: reqwest::Client::new(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
other => Err(anyhow::anyhow!(
|
||||
"unsupported session_object_storage_url scheme {other}"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn object_url(&self, key: &str) -> anyhow::Result<Url> {
|
||||
match self {
|
||||
SessionObjectStore::Http(store) => store.object_url(key),
|
||||
SessionObjectStore::Azure(store) => store.object_url(key),
|
||||
}
|
||||
}
|
||||
|
||||
async fn object_exists(&self, key: &str) -> anyhow::Result<bool> {
|
||||
match self {
|
||||
SessionObjectStore::Http(store) => store.object_exists(key).await,
|
||||
SessionObjectStore::Azure(store) => store.object_exists(key).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn put_object(&self, key: &str, data: Vec<u8>, content_type: &str) -> anyhow::Result<()> {
|
||||
match self {
|
||||
SessionObjectStore::Http(store) => store.put_object(key, data, content_type).await,
|
||||
SessionObjectStore::Azure(store) => store.put_object(key, data, content_type).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn put_object_if_absent(
|
||||
&self,
|
||||
key: &str,
|
||||
data: Vec<u8>,
|
||||
content_type: &str,
|
||||
) -> anyhow::Result<bool> {
|
||||
match self {
|
||||
SessionObjectStore::Http(store) => {
|
||||
store.put_object_if_absent(key, data, content_type).await
|
||||
}
|
||||
SessionObjectStore::Azure(store) => {
|
||||
store.put_object_if_absent(key, data, content_type).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn put_object_file(
|
||||
&self,
|
||||
key: &str,
|
||||
path: &Path,
|
||||
content_type: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
match self {
|
||||
SessionObjectStore::Http(store) => store.put_object_file(key, path, content_type).await,
|
||||
SessionObjectStore::Azure(store) => {
|
||||
store.put_object_file(key, path, content_type).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_object_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
|
||||
match self {
|
||||
SessionObjectStore::Http(store) => store.get_object_bytes(key).await,
|
||||
SessionObjectStore::Azure(store) => store.get_object_bytes(key).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_object_to_file(&self, key: &str, path: &Path) -> anyhow::Result<bool> {
|
||||
match self {
|
||||
SessionObjectStore::Http(store) => store.get_object_to_file(key, path).await,
|
||||
SessionObjectStore::Azure(store) => store.get_object_to_file(key, path).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upload_rollout_with_owner(
|
||||
base_url: &str,
|
||||
session_id: ThreadId,
|
||||
owner: &str,
|
||||
rollout_path: &Path,
|
||||
) -> anyhow::Result<SessionShareResult> {
|
||||
let store = SessionObjectStore::new(base_url).await?;
|
||||
let key = object_key(session_id);
|
||||
let meta_key = meta_key(session_id);
|
||||
let rollout_exists = store.object_exists(&key).await?;
|
||||
let now = OffsetDateTime::now_utc().unix_timestamp();
|
||||
let meta = fetch_meta(&store, &meta_key).await?;
|
||||
|
||||
match (rollout_exists, meta) {
|
||||
(true, Some(meta)) => {
|
||||
if meta.owner != owner {
|
||||
return Err(anyhow::anyhow!(
|
||||
"remote session already exists and belongs to another user"
|
||||
));
|
||||
}
|
||||
store
|
||||
.put_object_file(&key, rollout_path, "application/x-ndjson")
|
||||
.await
|
||||
.with_context(|| format!("failed to upload rollout for id {session_id}"))?;
|
||||
let updated = SessionShareMeta {
|
||||
owner: meta.owner,
|
||||
created_at: meta.created_at,
|
||||
updated_at: now,
|
||||
};
|
||||
upload_meta(&store, &meta_key, &updated).await?;
|
||||
}
|
||||
(true, None) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"remote session already exists but has no metadata; refusing to overwrite"
|
||||
));
|
||||
}
|
||||
(false, Some(meta)) => {
|
||||
if meta.owner != owner {
|
||||
return Err(anyhow::anyhow!(
|
||||
"remote session metadata already exists and belongs to another user"
|
||||
));
|
||||
}
|
||||
store
|
||||
.put_object_file(&key, rollout_path, "application/x-ndjson")
|
||||
.await
|
||||
.with_context(|| format!("failed to upload rollout for id {session_id}"))?;
|
||||
let updated = SessionShareMeta {
|
||||
owner: meta.owner,
|
||||
created_at: meta.created_at,
|
||||
updated_at: now,
|
||||
};
|
||||
upload_meta(&store, &meta_key, &updated).await?;
|
||||
}
|
||||
(false, None) => {
|
||||
let meta = SessionShareMeta {
|
||||
owner: owner.to_string(),
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
};
|
||||
let created = create_meta_if_absent(&store, &meta_key, &meta).await?;
|
||||
if created {
|
||||
store
|
||||
.put_object_file(&key, rollout_path, "application/x-ndjson")
|
||||
.await
|
||||
.with_context(|| format!("failed to upload rollout for id {session_id}"))?;
|
||||
} else {
|
||||
let meta = fetch_meta(&store, &meta_key).await?.ok_or_else(|| {
|
||||
anyhow::anyhow!("failed to atomically create share metadata; try again")
|
||||
})?;
|
||||
if meta.owner != owner {
|
||||
return Err(anyhow::anyhow!(
|
||||
"remote session metadata already exists and belongs to another user"
|
||||
));
|
||||
}
|
||||
store
|
||||
.put_object_file(&key, rollout_path, "application/x-ndjson")
|
||||
.await
|
||||
.with_context(|| format!("failed to upload rollout for id {session_id}"))?;
|
||||
let updated = SessionShareMeta {
|
||||
owner: meta.owner,
|
||||
created_at: meta.created_at,
|
||||
updated_at: now,
|
||||
};
|
||||
upload_meta(&store, &meta_key, &updated).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let object_url = store.object_url(&key)?;
|
||||
Ok(SessionShareResult {
|
||||
remote_id: session_id,
|
||||
object_url,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn download_rollout_if_available(
|
||||
base_url: &str,
|
||||
session_id: ThreadId,
|
||||
codex_home: &Path,
|
||||
) -> anyhow::Result<Option<PathBuf>> {
|
||||
let store = SessionObjectStore::new(base_url).await?;
|
||||
let key = object_key(session_id);
|
||||
let meta_key = meta_key(session_id);
|
||||
let path = build_rollout_download_path(codex_home, session_id)?;
|
||||
if !store.get_object_to_file(&key, &path).await? {
|
||||
return Ok(None);
|
||||
}
|
||||
let meta_path = share_meta_path_for_rollout_path(&path);
|
||||
let meta = match fetch_meta(&store, &meta_key).await {
|
||||
Ok(meta) => meta,
|
||||
Err(err) => {
|
||||
cleanup_downloaded_rollout(&path, &meta_path).await;
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
match meta {
|
||||
Some(meta) => {
|
||||
let payload =
|
||||
serde_json::to_vec(&meta).with_context(|| "failed to serialize metadata")?;
|
||||
if let Err(err) = tokio::fs::write(&meta_path, payload)
|
||||
.await
|
||||
.with_context(|| format!("failed to write share metadata {}", meta_path.display()))
|
||||
{
|
||||
cleanup_downloaded_rollout(&path, &meta_path).await;
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let _ = tokio::fs::remove_file(&meta_path).await;
|
||||
}
|
||||
}
|
||||
Ok(Some(path))
|
||||
}
|
||||
|
||||
fn object_key(id: ThreadId) -> String {
|
||||
format!("{SHARE_OBJECT_PREFIX}/{id}{SHARE_OBJECT_SUFFIX}")
|
||||
}
|
||||
|
||||
fn meta_key(id: ThreadId) -> String {
|
||||
format!("{SHARE_OBJECT_PREFIX}/{id}{SHARE_META_SUFFIX}")
|
||||
}
|
||||
|
||||
pub fn local_share_owner(rollout_path: &Path) -> anyhow::Result<Option<String>> {
|
||||
let meta_path = share_meta_path_for_rollout_path(rollout_path);
|
||||
let bytes = match std::fs::read(&meta_path) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(None),
|
||||
Err(err) => {
|
||||
return Err(err)
|
||||
.with_context(|| format!("failed to read share metadata {}", meta_path.display()));
|
||||
}
|
||||
};
|
||||
let meta: SessionShareMeta =
|
||||
serde_json::from_slice(&bytes).with_context(|| "failed to parse session share metadata")?;
|
||||
Ok(Some(meta.owner))
|
||||
}
|
||||
|
||||
async fn fetch_meta(
|
||||
store: &SessionObjectStore,
|
||||
key: &str,
|
||||
) -> anyhow::Result<Option<SessionShareMeta>> {
|
||||
let Some(bytes) = store.get_object_bytes(key).await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
let meta: SessionShareMeta =
|
||||
serde_json::from_slice(&bytes).with_context(|| "failed to parse session share metadata")?;
|
||||
Ok(Some(meta))
|
||||
}
|
||||
|
||||
async fn upload_meta(
|
||||
store: &SessionObjectStore,
|
||||
key: &str,
|
||||
meta: &SessionShareMeta,
|
||||
) -> anyhow::Result<()> {
|
||||
let payload = serde_json::to_vec(meta).with_context(|| "failed to serialize metadata")?;
|
||||
store.put_object(key, payload, "application/json").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_meta_if_absent(
|
||||
store: &SessionObjectStore,
|
||||
key: &str,
|
||||
meta: &SessionShareMeta,
|
||||
) -> anyhow::Result<bool> {
|
||||
let payload = serde_json::to_vec(meta).with_context(|| "failed to serialize metadata")?;
|
||||
store
|
||||
.put_object_if_absent(key, payload, "application/json")
|
||||
.await
|
||||
}
|
||||
|
||||
fn build_rollout_download_path(codex_home: &Path, session_id: ThreadId) -> anyhow::Result<PathBuf> {
|
||||
let timestamp = OffsetDateTime::now_local().unwrap_or_else(|_| OffsetDateTime::now_utc());
|
||||
let format: &[FormatItem] =
|
||||
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
|
||||
let date_str = timestamp
|
||||
.format(format)
|
||||
.map_err(|e| anyhow::anyhow!("failed to format timestamp: {e}"))?;
|
||||
let mut dir = codex_home.to_path_buf();
|
||||
dir.push(crate::rollout::SESSIONS_SUBDIR);
|
||||
dir.push(timestamp.year().to_string());
|
||||
dir.push(format!("{:02}", u8::from(timestamp.month())));
|
||||
dir.push(format!("{:02}", timestamp.day()));
|
||||
let filename = format!("rollout-{date_str}-{session_id}.jsonl");
|
||||
Ok(dir.join(filename))
|
||||
}
|
||||
|
||||
fn share_meta_path_for_rollout_path(path: &Path) -> PathBuf {
|
||||
let file_name = path.file_name().unwrap_or_else(|| OsStr::new("session"));
|
||||
let mut name = OsString::from(file_name);
|
||||
name.push(".share-meta.json");
|
||||
path.with_file_name(name)
|
||||
}
|
||||
|
||||
impl HttpObjectStore {
|
||||
fn object_url(&self, key: &str) -> anyhow::Result<Url> {
|
||||
let mut base = self.base_url.clone();
|
||||
let query = base.query().map(str::to_string);
|
||||
base.set_query(None);
|
||||
let mut joined = base
|
||||
.join(key)
|
||||
.with_context(|| format!("failed to build object URL for key {key}"))?;
|
||||
if let Some(query) = query {
|
||||
joined.set_query(Some(&query));
|
||||
}
|
||||
Ok(joined)
|
||||
}
|
||||
|
||||
async fn object_exists(&self, key: &str) -> anyhow::Result<bool> {
|
||||
let url = self.object_url(key)?;
|
||||
let response = self.client.head(url).send().await?;
|
||||
match response.status() {
|
||||
StatusCode::NOT_FOUND => Ok(false),
|
||||
StatusCode::METHOD_NOT_ALLOWED | StatusCode::NOT_IMPLEMENTED => {
|
||||
self.object_exists_via_get(key).await
|
||||
}
|
||||
status if status.is_success() => Ok(true),
|
||||
status => Err(anyhow::anyhow!(
|
||||
"object store HEAD failed with status {status}"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn object_exists_via_get(&self, key: &str) -> anyhow::Result<bool> {
|
||||
let url = self.object_url(key)?;
|
||||
let response = self
|
||||
.client
|
||||
.get(url)
|
||||
.header(reqwest::header::RANGE, "bytes=0-0")
|
||||
.send()
|
||||
.await?;
|
||||
match response.status() {
|
||||
StatusCode::NOT_FOUND => Ok(false),
|
||||
StatusCode::PARTIAL_CONTENT | StatusCode::OK => Ok(true),
|
||||
status => Err(anyhow::anyhow!(
|
||||
"object store GET probe failed with status {status}"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn put_object(&self, key: &str, data: Vec<u8>, content_type: &str) -> anyhow::Result<()> {
|
||||
let url = self.object_url(key)?;
|
||||
let response = self
|
||||
.client
|
||||
.put(url)
|
||||
.header(reqwest::header::CONTENT_TYPE, content_type)
|
||||
.body(data)
|
||||
.send()
|
||||
.await?;
|
||||
if response.status().is_success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow::anyhow!(
|
||||
"object store PUT failed with status {}",
|
||||
response.status()
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
async fn put_object_if_absent(
|
||||
&self,
|
||||
key: &str,
|
||||
data: Vec<u8>,
|
||||
content_type: &str,
|
||||
) -> anyhow::Result<bool> {
|
||||
let url = self.object_url(key)?;
|
||||
let response = self
|
||||
.client
|
||||
.put(url)
|
||||
.header(reqwest::header::IF_NONE_MATCH, "*")
|
||||
.header(reqwest::header::CONTENT_TYPE, content_type)
|
||||
.body(data)
|
||||
.send()
|
||||
.await?;
|
||||
match response.status() {
|
||||
status if status.is_success() => Ok(true),
|
||||
StatusCode::PRECONDITION_FAILED | StatusCode::CONFLICT => Ok(false),
|
||||
status => Err(anyhow::anyhow!(
|
||||
"object store conditional PUT failed with status {status}"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn put_object_file(
|
||||
&self,
|
||||
key: &str,
|
||||
path: &Path,
|
||||
content_type: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let url = self.object_url(key)?;
|
||||
let file = tokio::fs::File::open(path)
|
||||
.await
|
||||
.with_context(|| format!("failed to open rollout at {}", path.display()))?;
|
||||
let stream = ReaderStream::new(file);
|
||||
let response = self
|
||||
.client
|
||||
.put(url)
|
||||
.header(reqwest::header::CONTENT_TYPE, content_type)
|
||||
.body(reqwest::Body::wrap_stream(stream))
|
||||
.send()
|
||||
.await?;
|
||||
if response.status().is_success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow::anyhow!(
|
||||
"object store PUT failed with status {}",
|
||||
response.status()
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_object_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
|
||||
let url = self.object_url(key)?;
|
||||
let response = self.client.get(url).send().await?;
|
||||
match response.status() {
|
||||
StatusCode::NOT_FOUND => Ok(None),
|
||||
status if status.is_success() => {
|
||||
let bytes = response.bytes().await?;
|
||||
Ok(Some(bytes.to_vec()))
|
||||
}
|
||||
status => Err(anyhow::anyhow!(
|
||||
"object store GET failed with status {status}"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_object_to_file(&self, key: &str, path: &Path) -> anyhow::Result<bool> {
|
||||
let url = self.object_url(key)?;
|
||||
let response = self.client.get(url).send().await?;
|
||||
match response.status() {
|
||||
StatusCode::NOT_FOUND => Ok(false),
|
||||
status if status.is_success() => {
|
||||
write_response_to_file(path, response).await?;
|
||||
Ok(true)
|
||||
}
|
||||
status => Err(anyhow::anyhow!(
|
||||
"object store GET failed with status {status}"
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AzureObjectStore {
|
||||
fn new(url: &Url) -> anyhow::Result<Self> {
|
||||
let endpoint = azure_endpoint(url)?;
|
||||
let (container, prefix) = azure_container_and_prefix(url)?;
|
||||
let sas_query = url.query().map(str::to_string);
|
||||
let credential = if sas_query.is_some() {
|
||||
None
|
||||
} else {
|
||||
let credential: Arc<dyn TokenCredential> = Arc::new(AzureCliCredential::new());
|
||||
Some(credential)
|
||||
};
|
||||
Ok(Self {
|
||||
endpoint,
|
||||
container,
|
||||
prefix,
|
||||
sas_query,
|
||||
client: reqwest::Client::new(),
|
||||
credential,
|
||||
})
|
||||
}
|
||||
|
||||
fn new_from_az(url: &Url) -> anyhow::Result<Self> {
|
||||
let account = url
|
||||
.host_str()
|
||||
.ok_or_else(|| anyhow::anyhow!("az url missing account name"))?;
|
||||
let endpoint = azure_endpoint_for_account(account)?;
|
||||
let (container, prefix) = azure_container_and_prefix(url)?;
|
||||
let sas_query = url.query().map(str::to_string);
|
||||
let credential = if sas_query.is_some() {
|
||||
None
|
||||
} else {
|
||||
let credential: Arc<dyn TokenCredential> = Arc::new(AzureCliCredential::new());
|
||||
Some(credential)
|
||||
};
|
||||
Ok(Self {
|
||||
endpoint,
|
||||
container,
|
||||
prefix,
|
||||
sas_query,
|
||||
client: reqwest::Client::new(),
|
||||
credential,
|
||||
})
|
||||
}
|
||||
|
||||
fn object_url(&self, key: &str) -> anyhow::Result<Url> {
|
||||
let full_key = join_prefix(&self.prefix, key);
|
||||
let mut url = self.endpoint.clone();
|
||||
if full_key.is_empty() {
|
||||
url.set_path(&format!("/{}", self.container));
|
||||
} else {
|
||||
url.set_path(&format!("/{}/{}", self.container, full_key));
|
||||
}
|
||||
if let Some(query) = &self.sas_query {
|
||||
url.set_query(Some(query));
|
||||
}
|
||||
Ok(url)
|
||||
}
|
||||
|
||||
async fn object_exists(&self, key: &str) -> anyhow::Result<bool> {
|
||||
let url = self.object_url(key)?;
|
||||
let response = self
|
||||
.authorized_request(self.client.head(url))
|
||||
.await?
|
||||
.send()
|
||||
.await?;
|
||||
match response.status() {
|
||||
StatusCode::NOT_FOUND => Ok(false),
|
||||
status if status.is_success() => Ok(true),
|
||||
status => Err(anyhow::anyhow!(
|
||||
"azure blob HEAD failed with status {status}{}",
|
||||
azure_response_context(response.headers())
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn put_object(&self, key: &str, data: Vec<u8>, content_type: &str) -> anyhow::Result<()> {
|
||||
let url = self.object_url(key)?;
|
||||
let response = self
|
||||
.authorized_request(
|
||||
self.client
|
||||
.put(url)
|
||||
.header("x-ms-blob-type", "BlockBlob")
|
||||
.header(reqwest::header::CONTENT_TYPE, content_type)
|
||||
.body(data),
|
||||
)
|
||||
.await?
|
||||
.send()
|
||||
.await?;
|
||||
if response.status().is_success() {
|
||||
Ok(())
|
||||
} else {
|
||||
let status = response.status();
|
||||
let headers = azure_response_context(response.headers());
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
let body_snippet = azure_response_body_snippet(&body);
|
||||
Err(anyhow::anyhow!(
|
||||
"azure blob PUT failed with status {status}{headers}{body_snippet}"
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
async fn put_object_if_absent(
|
||||
&self,
|
||||
key: &str,
|
||||
data: Vec<u8>,
|
||||
content_type: &str,
|
||||
) -> anyhow::Result<bool> {
|
||||
let url = self.object_url(key)?;
|
||||
let response = self
|
||||
.authorized_request(
|
||||
self.client
|
||||
.put(url)
|
||||
.header("x-ms-blob-type", "BlockBlob")
|
||||
.header(reqwest::header::IF_NONE_MATCH, "*")
|
||||
.header(reqwest::header::CONTENT_TYPE, content_type)
|
||||
.body(data),
|
||||
)
|
||||
.await?
|
||||
.send()
|
||||
.await?;
|
||||
match response.status() {
|
||||
status if status.is_success() => Ok(true),
|
||||
StatusCode::PRECONDITION_FAILED => Ok(false),
|
||||
status => Err(anyhow::anyhow!(
|
||||
"azure blob conditional PUT failed with status {status}{}",
|
||||
azure_response_context(response.headers())
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn put_object_file(
|
||||
&self,
|
||||
key: &str,
|
||||
path: &Path,
|
||||
content_type: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let url = self.object_url(key)?;
|
||||
let file = tokio::fs::File::open(path)
|
||||
.await
|
||||
.with_context(|| format!("failed to open rollout at {}", path.display()))?;
|
||||
let stream = ReaderStream::new(file);
|
||||
let response = self
|
||||
.authorized_request(
|
||||
self.client
|
||||
.put(url)
|
||||
.header("x-ms-blob-type", "BlockBlob")
|
||||
.header(reqwest::header::CONTENT_TYPE, content_type)
|
||||
.body(reqwest::Body::wrap_stream(stream)),
|
||||
)
|
||||
.await?
|
||||
.send()
|
||||
.await?;
|
||||
if response.status().is_success() {
|
||||
Ok(())
|
||||
} else {
|
||||
let status = response.status();
|
||||
let headers = azure_response_context(response.headers());
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
let body_snippet = azure_response_body_snippet(&body);
|
||||
Err(anyhow::anyhow!(
|
||||
"azure blob PUT failed with status {status}{headers}{body_snippet}"
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_object_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
|
||||
let url = self.object_url(key)?;
|
||||
let response = self
|
||||
.authorized_request(self.client.get(url))
|
||||
.await?
|
||||
.send()
|
||||
.await?;
|
||||
match response.status() {
|
||||
StatusCode::NOT_FOUND => Ok(None),
|
||||
status if status.is_success() => {
|
||||
let bytes = response.bytes().await?;
|
||||
Ok(Some(bytes.to_vec()))
|
||||
}
|
||||
status => Err(anyhow::anyhow!(
|
||||
"azure blob GET failed with status {status}{}",
|
||||
azure_response_context(response.headers())
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_object_to_file(&self, key: &str, path: &Path) -> anyhow::Result<bool> {
|
||||
let url = self.object_url(key)?;
|
||||
let response = self
|
||||
.authorized_request(self.client.get(url))
|
||||
.await?
|
||||
.send()
|
||||
.await?;
|
||||
match response.status() {
|
||||
StatusCode::NOT_FOUND => Ok(false),
|
||||
status if status.is_success() => {
|
||||
write_response_to_file(path, response).await?;
|
||||
Ok(true)
|
||||
}
|
||||
status => Err(anyhow::anyhow!(
|
||||
"azure blob GET failed with status {status}{}",
|
||||
azure_response_context(response.headers())
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn ensure_trailing_slash(url: &mut Url) {
|
||||
let path = url.path();
|
||||
if path.ends_with('/') {
|
||||
return;
|
||||
}
|
||||
let trimmed = path.trim_end_matches('/');
|
||||
let new_path = if trimmed.is_empty() {
|
||||
"/".to_string()
|
||||
} else {
|
||||
format!("{trimmed}/")
|
||||
};
|
||||
url.set_path(&new_path);
|
||||
}
|
||||
|
||||
fn join_prefix(prefix: &str, key: &str) -> String {
|
||||
if prefix.is_empty() {
|
||||
key.to_string()
|
||||
} else {
|
||||
format!("{prefix}/{key}")
|
||||
}
|
||||
}
|
||||
|
||||
fn is_azure_blob_url(url: &Url) -> bool {
|
||||
let Some(host) = url.host_str() else {
|
||||
return false;
|
||||
};
|
||||
host.ends_with(".blob.core.windows.net")
|
||||
}
|
||||
|
||||
fn azure_endpoint(url: &Url) -> anyhow::Result<Url> {
|
||||
let mut endpoint = url.clone();
|
||||
endpoint.set_path("/");
|
||||
endpoint.set_query(None);
|
||||
endpoint.set_fragment(None);
|
||||
Ok(endpoint)
|
||||
}
|
||||
|
||||
fn azure_container_and_prefix(url: &Url) -> anyhow::Result<(String, String)> {
|
||||
let segments = url
|
||||
.path_segments()
|
||||
.map(|iter| {
|
||||
iter.filter(|segment| !segment.is_empty())
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
azure_container_and_prefix_from_segments(&segments)
|
||||
}
|
||||
|
||||
fn azure_container_and_prefix_from_segments(segments: &[&str]) -> anyhow::Result<(String, String)> {
|
||||
if segments.is_empty() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"azure blob url must include a container name"
|
||||
));
|
||||
}
|
||||
let container = segments[0].to_string();
|
||||
let prefix = segments[1..].join("/");
|
||||
Ok((container, prefix))
|
||||
}
|
||||
|
||||
fn azure_request(builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
|
||||
builder.header("x-ms-version", "2021-08-06")
|
||||
}
|
||||
|
||||
fn azure_endpoint_for_account(account: &str) -> anyhow::Result<Url> {
|
||||
let endpoint = format!("https://{account}.blob.core.windows.net/");
|
||||
Url::parse(&endpoint).with_context(|| "failed to build azure blob endpoint")
|
||||
}
|
||||
|
||||
impl AzureObjectStore {
|
||||
async fn authorized_request(
|
||||
&self,
|
||||
builder: reqwest::RequestBuilder,
|
||||
) -> anyhow::Result<reqwest::RequestBuilder> {
|
||||
let builder = azure_request(builder);
|
||||
let Some(credential) = &self.credential else {
|
||||
return Ok(builder);
|
||||
};
|
||||
let token = credential
|
||||
.get_token(&[AZURE_STORAGE_SCOPE])
|
||||
.await
|
||||
.with_context(|| "failed to acquire azure blob access token")?;
|
||||
Ok(builder.bearer_auth(token.token.secret()))
|
||||
}
|
||||
}
|
||||
|
||||
fn azure_response_context(headers: &reqwest::header::HeaderMap) -> String {
|
||||
let mut parts = Vec::new();
|
||||
if let Some(value) = azure_header_value(headers, "x-ms-error-code") {
|
||||
parts.push(format!("x-ms-error-code={value}"));
|
||||
}
|
||||
if let Some(value) = azure_header_value(headers, "x-ms-request-id") {
|
||||
parts.push(format!("x-ms-request-id={value}"));
|
||||
}
|
||||
if let Some(value) = azure_header_value(headers, "www-authenticate") {
|
||||
parts.push(format!("www-authenticate={value}"));
|
||||
}
|
||||
if parts.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
format!(" ({})", parts.join(", "))
|
||||
}
|
||||
}
|
||||
|
||||
fn azure_header_value(headers: &reqwest::header::HeaderMap, name: &str) -> Option<String> {
|
||||
headers
|
||||
.get(name)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(ToString::to_string)
|
||||
}
|
||||
|
||||
fn azure_response_body_snippet(body: &str) -> String {
|
||||
let trimmed = body.trim();
|
||||
if trimmed.is_empty() {
|
||||
return String::new();
|
||||
}
|
||||
let snippet = if trimmed.len() <= 512 {
|
||||
trimmed.to_string()
|
||||
} else {
|
||||
let truncated: String = trimmed.chars().take(512).collect();
|
||||
format!("{truncated}...")
|
||||
};
|
||||
format!(" (body={snippet})")
|
||||
}
|
||||
|
||||
async fn write_response_to_file(path: &Path, response: reqwest::Response) -> anyhow::Result<()> {
|
||||
let path = path.to_path_buf();
|
||||
let parent = path
|
||||
.parent()
|
||||
.ok_or_else(|| anyhow::anyhow!("failed to resolve rollout directory"))?;
|
||||
tokio::fs::create_dir_all(parent)
|
||||
.await
|
||||
.with_context(|| format!("failed to create rollout directory {}", parent.display()))?;
|
||||
let result = async {
|
||||
let mut file = tokio::fs::File::create(&path)
|
||||
.await
|
||||
.with_context(|| format!("failed to create rollout file {}", path.display()))?;
|
||||
let mut stream = response.bytes_stream();
|
||||
while let Some(chunk) = stream.next().await {
|
||||
let chunk = chunk?;
|
||||
file.write_all(&chunk)
|
||||
.await
|
||||
.with_context(|| format!("failed to write rollout file {}", path.display()))?;
|
||||
}
|
||||
file.flush()
|
||||
.await
|
||||
.with_context(|| format!("failed to flush rollout file {}", path.display()))?;
|
||||
Ok(())
|
||||
}
|
||||
.await;
|
||||
|
||||
if let Err(err) = result {
|
||||
let _ = tokio::fs::remove_file(&path).await;
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn cleanup_downloaded_rollout(path: &Path, meta_path: &Path) {
|
||||
let _ = tokio::fs::remove_file(path).await;
|
||||
let _ = tokio::fs::remove_file(meta_path).await;
|
||||
}
|
||||
@@ -73,6 +73,8 @@ ignore = [
|
||||
{ id = "RUSTSEC-2024-0388", reason = "derivative is unmaintained; pulled in via starlark v0.13.0 used by execpolicy/cli/core; no fixed release yet" },
|
||||
{ id = "RUSTSEC-2025-0057", reason = "fxhash is unmaintained; pulled in via starlark_map/starlark v0.13.0 used by execpolicy/cli/core; no fixed release yet" },
|
||||
{ id = "RUSTSEC-2024-0436", reason = "paste is unmaintained; pulled in via ratatui/rmcp/starlark used by tui/execpolicy; no fixed release yet" },
|
||||
{ id = "RUSTSEC-2025-0134", reason = "rustls-pemfile is unmaintained; pulled in via rama-tls-rustls used by codex-network-proxy; no safe upgrade until rama removes the dependency" },
|
||||
{ id = "RUSTSEC-2024-0384", reason = "instant is unmaintained; pulled in via http-types -> futures-lite used by Azure auth; no safe upgrade available yet" },
|
||||
# TODO(joshka, nornagon): remove this exception when once we update the ratatui fork to a version that uses lru 0.13+.
|
||||
{ id = "RUSTSEC-2026-0002", reason = "lru 0.12.5 is pulled in via ratatui fork; cannot upgrade until the fork is updated" },
|
||||
]
|
||||
|
||||
@@ -55,6 +55,8 @@ use codex_core::protocol::SandboxPolicy;
|
||||
use codex_core::protocol::SessionSource;
|
||||
use codex_core::protocol::SkillErrorInfo;
|
||||
use codex_core::protocol::TokenUsage;
|
||||
use codex_core::read_session_meta_line;
|
||||
use codex_core::session_share::local_share_owner;
|
||||
#[cfg(target_os = "windows")]
|
||||
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
|
||||
use codex_otel::OtelManager;
|
||||
@@ -1409,6 +1411,46 @@ impl App {
|
||||
AppEvent::OpenResumePicker => {
|
||||
match crate::resume_picker::run_resume_picker(tui, &self.config, false).await? {
|
||||
SessionSelection::Resume(path) => {
|
||||
let current_owner = self
|
||||
.auth_manager
|
||||
.auth_cached()
|
||||
.and_then(|auth| auth.get_account_email());
|
||||
match local_share_owner(&path) {
|
||||
Ok(Some(owner)) => {
|
||||
if current_owner.as_deref() != Some(owner.as_str()) {
|
||||
let session_id = read_session_meta_line(&path)
|
||||
.await
|
||||
.ok()
|
||||
.map(|meta| meta.meta.id.to_string());
|
||||
let (id_display, fork_hint) = match session_id {
|
||||
Some(id) => {
|
||||
(id.clone(), format!("Use `codex fork {id}` instead."))
|
||||
}
|
||||
None => (
|
||||
"this session".to_string(),
|
||||
"Use `codex fork` to select it instead.".to_string(),
|
||||
),
|
||||
};
|
||||
self.chat_widget.add_error_message(format!(
|
||||
"Cannot resume shared session {id_display} owned by {owner}. {fork_hint}"
|
||||
));
|
||||
return Ok(AppRunControl::Continue);
|
||||
}
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
error = %err,
|
||||
"failed to read shared session metadata; treating session as unshared"
|
||||
);
|
||||
self.chat_widget.add_info_message(
|
||||
"Share metadata unreadable; proceeding without owner check."
|
||||
.to_string(),
|
||||
None,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let current_cwd = self.config.cwd.clone();
|
||||
let resume_cwd = match crate::resolve_cwd_for_resume_or_fork(
|
||||
tui,
|
||||
@@ -1416,6 +1458,7 @@ impl App {
|
||||
&path,
|
||||
CwdPromptAction::Resume,
|
||||
true,
|
||||
false,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
@@ -1634,7 +1677,16 @@ impl App {
|
||||
return Ok(AppRunControl::Exit(ExitReason::Fatal(message)));
|
||||
}
|
||||
AppEvent::CodexOp(op) => {
|
||||
self.chat_widget.submit_op(op);
|
||||
if let Op::UserInputAnswer { id, response } = op {
|
||||
if crate::chatwidget::ChatWidget::is_share_request_id(&id) {
|
||||
self.chat_widget.handle_share_response(response);
|
||||
return Ok(AppRunControl::Continue);
|
||||
}
|
||||
self.chat_widget
|
||||
.submit_op(Op::UserInputAnswer { id, response });
|
||||
} else {
|
||||
self.chat_widget.submit_op(op);
|
||||
}
|
||||
}
|
||||
AppEvent::DiffResult(text) => {
|
||||
// Clear the in-progress state in the bottom pane
|
||||
|
||||
@@ -128,7 +128,11 @@ use codex_protocol::items::AgentMessageItem;
|
||||
use codex_protocol::models::MessagePhase;
|
||||
use codex_protocol::models::local_image_label_text;
|
||||
use codex_protocol::parse_command::ParsedCommand;
|
||||
use codex_protocol::request_user_input::RequestUserInputAnswer;
|
||||
use codex_protocol::request_user_input::RequestUserInputEvent;
|
||||
use codex_protocol::request_user_input::RequestUserInputQuestion;
|
||||
use codex_protocol::request_user_input::RequestUserInputQuestionOption;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse;
|
||||
use codex_protocol::user_input::TextElement;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_sleep_inhibitor::SleepInhibitor;
|
||||
@@ -157,6 +161,11 @@ const PLAN_IMPLEMENTATION_YES: &str = "Yes, implement this plan";
|
||||
const PLAN_IMPLEMENTATION_NO: &str = "No, stay in Plan mode";
|
||||
const PLAN_IMPLEMENTATION_CODING_MESSAGE: &str = "Implement the plan.";
|
||||
const CONNECTORS_SELECTION_VIEW_ID: &str = "connectors-selection";
|
||||
const SHARE_REQUEST_PREFIX: &str = "share:";
|
||||
const SHARE_SCOPE_QUESTION_ID: &str = "share_scope";
|
||||
const SHARE_EMAILS_QUESTION_ID: &str = "share_emails";
|
||||
const SHARE_SCOPE_EVERYONE_LABEL: &str = "Everyone using the same blob store";
|
||||
const SHARE_SCOPE_EMAIL_LABEL: &str = "Specific emails";
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event::ConnectorsSnapshot;
|
||||
@@ -404,6 +413,95 @@ pub(crate) fn get_limits_duration(windows_minutes: i64) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum ShareScope {
|
||||
Everyone,
|
||||
Emails,
|
||||
}
|
||||
|
||||
impl ShareScope {
|
||||
fn label(self) -> &'static str {
|
||||
match self {
|
||||
ShareScope::Everyone => SHARE_SCOPE_EVERYONE_LABEL,
|
||||
ShareScope::Emails => SHARE_SCOPE_EMAIL_LABEL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_share_scope(response: &RequestUserInputResponse) -> ShareScope {
|
||||
response
|
||||
.answers
|
||||
.get(SHARE_SCOPE_QUESTION_ID)
|
||||
.and_then(selected_label)
|
||||
.map(|label| {
|
||||
if label == SHARE_SCOPE_EMAIL_LABEL {
|
||||
ShareScope::Emails
|
||||
} else {
|
||||
ShareScope::Everyone
|
||||
}
|
||||
})
|
||||
.unwrap_or(ShareScope::Everyone)
|
||||
}
|
||||
|
||||
fn parse_share_emails(response: &RequestUserInputResponse) -> Vec<String> {
|
||||
let note = response
|
||||
.answers
|
||||
.get(SHARE_EMAILS_QUESTION_ID)
|
||||
.and_then(extract_user_note);
|
||||
let Some(note) = note else {
|
||||
return Vec::new();
|
||||
};
|
||||
note.split(',')
|
||||
.map(str::trim)
|
||||
.filter(|entry| !entry.is_empty())
|
||||
.map(ToString::to_string)
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn selected_label(answer: &RequestUserInputAnswer) -> Option<String> {
|
||||
answer
|
||||
.answers
|
||||
.iter()
|
||||
.find(|entry| !entry.starts_with("user_note: "))
|
||||
.cloned()
|
||||
}
|
||||
|
||||
fn extract_user_note(answer: &RequestUserInputAnswer) -> Option<String> {
|
||||
answer
|
||||
.answers
|
||||
.iter()
|
||||
.find_map(|entry| entry.strip_prefix("user_note: ").map(ToString::to_string))
|
||||
}
|
||||
|
||||
fn share_success_lines(
|
||||
remote_id: ThreadId,
|
||||
scope: ShareScope,
|
||||
emails: &[String],
|
||||
) -> Vec<Line<'static>> {
|
||||
let mut lines: Vec<Line<'static>> = Vec::new();
|
||||
lines.push(vec!["• ".dim(), "Session shared.".into()].into());
|
||||
lines.push(vec![" ".into(), "Access: ".dim(), scope.label().into()].into());
|
||||
if matches!(scope, ShareScope::Emails) {
|
||||
let email_list = if emails.is_empty() {
|
||||
"(none provided)".to_string()
|
||||
} else {
|
||||
emails.join(", ")
|
||||
};
|
||||
lines.push(vec![" ".into(), "Emails: ".dim(), email_list.into()].into());
|
||||
}
|
||||
let fork_command = format!("codex fork {remote_id}");
|
||||
lines.push(vec![" ".into(), "Fork with: ".dim(), fork_command.cyan()].into());
|
||||
lines.push(
|
||||
vec![
|
||||
" ".into(),
|
||||
"Remote session id: ".dim(),
|
||||
remote_id.to_string().cyan(),
|
||||
]
|
||||
.into(),
|
||||
);
|
||||
lines
|
||||
}
|
||||
|
||||
/// Common initialization parameters shared by all `ChatWidget` constructors.
|
||||
pub(crate) struct ChatWidgetInit {
|
||||
pub(crate) config: Config,
|
||||
@@ -3262,6 +3360,9 @@ impl ChatWidget {
|
||||
SlashCommand::Fork => {
|
||||
self.app_event_tx.send(AppEvent::ForkCurrentSession);
|
||||
}
|
||||
SlashCommand::Share => {
|
||||
self.start_share_flow();
|
||||
}
|
||||
SlashCommand::Init => {
|
||||
let init_target = self.config.cwd.join(DEFAULT_PROJECT_DOC_FILENAME);
|
||||
if init_target.exists() {
|
||||
@@ -3632,6 +3733,139 @@ impl ChatWidget {
|
||||
self.bottom_pane.show_view(Box::new(view));
|
||||
}
|
||||
|
||||
fn start_share_flow(&mut self) {
|
||||
let Some(thread_id) = self.thread_id else {
|
||||
self.add_error_message("Current session is not ready to share yet.".to_string());
|
||||
return;
|
||||
};
|
||||
let Some(rollout_path) = self.rollout_path() else {
|
||||
self.add_error_message("Current session is not ready to share yet.".to_string());
|
||||
return;
|
||||
};
|
||||
if !rollout_path.exists() {
|
||||
self.add_error_message("Current session is not ready to share yet.".to_string());
|
||||
return;
|
||||
}
|
||||
if self.config.session_object_storage_url.is_none()
|
||||
&& self.config.session_object_storage_url_cmd.is_none()
|
||||
{
|
||||
self.add_error_message(
|
||||
"Sharing requires `session_object_storage_url` or `session_object_storage_url_cmd` in config.toml."
|
||||
.to_string(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let request_id = format!("{SHARE_REQUEST_PREFIX}{thread_id}");
|
||||
let questions = vec![
|
||||
RequestUserInputQuestion {
|
||||
id: SHARE_SCOPE_QUESTION_ID.to_string(),
|
||||
header: "Share".to_string(),
|
||||
question: "Who should be able to open this shared session?".to_string(),
|
||||
is_secret: false,
|
||||
is_other: false,
|
||||
options: Some(vec![
|
||||
RequestUserInputQuestionOption {
|
||||
label: SHARE_SCOPE_EVERYONE_LABEL.to_string(),
|
||||
description: "Anyone with access to the same object store.".to_string(),
|
||||
},
|
||||
RequestUserInputQuestionOption {
|
||||
label: SHARE_SCOPE_EMAIL_LABEL.to_string(),
|
||||
description: "Restrict to a list of emails (not enforced yet).".to_string(),
|
||||
},
|
||||
]),
|
||||
},
|
||||
RequestUserInputQuestion {
|
||||
id: SHARE_EMAILS_QUESTION_ID.to_string(),
|
||||
header: "Emails".to_string(),
|
||||
question: "If sharing with specific emails, list them (comma-separated)."
|
||||
.to_string(),
|
||||
is_secret: false,
|
||||
is_other: false,
|
||||
options: None,
|
||||
},
|
||||
];
|
||||
self.bottom_pane
|
||||
.push_user_input_request(RequestUserInputEvent {
|
||||
call_id: request_id.clone(),
|
||||
turn_id: request_id,
|
||||
questions,
|
||||
});
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
pub(crate) fn is_share_request_id(id: &str) -> bool {
|
||||
id.starts_with(SHARE_REQUEST_PREFIX)
|
||||
}
|
||||
|
||||
pub(crate) fn handle_share_response(&mut self, response: RequestUserInputResponse) {
|
||||
let scope = parse_share_scope(&response);
|
||||
let emails = parse_share_emails(&response);
|
||||
|
||||
// TODO: Enforce email-based access once we have a server-side auth check.
|
||||
|
||||
let owner = self
|
||||
.auth_manager
|
||||
.auth_cached()
|
||||
.and_then(|auth| auth.get_account_email());
|
||||
let Some(owner) = owner else {
|
||||
self.add_error_message(
|
||||
"Sharing requires a signed-in ChatGPT account with an email.".to_string(),
|
||||
);
|
||||
return;
|
||||
};
|
||||
let Some(thread_id) = self.thread_id else {
|
||||
self.add_error_message("Current session is not ready to share yet.".to_string());
|
||||
return;
|
||||
};
|
||||
let Some(rollout_path) = self.rollout_path() else {
|
||||
self.add_error_message("Current session is not ready to share yet.".to_string());
|
||||
return;
|
||||
};
|
||||
|
||||
self.add_info_message("Sharing current session…".to_string(), None);
|
||||
|
||||
let config = self.config.clone();
|
||||
let app_event_tx = self.app_event_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let storage_url = match config.resolve_session_object_storage_url().await {
|
||||
Ok(Some(url)) => url,
|
||||
Ok(None) => {
|
||||
let cell = history_cell::new_error_event(
|
||||
"Sharing requires `session_object_storage_url` or `session_object_storage_url_cmd` in config.toml."
|
||||
.to_string(),
|
||||
);
|
||||
app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(cell)));
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
let cell = history_cell::new_error_event(format!(
|
||||
"Failed to resolve session object storage URL: {err}"
|
||||
));
|
||||
app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(cell)));
|
||||
return;
|
||||
}
|
||||
};
|
||||
let result = codex_core::session_share::upload_rollout_with_owner(
|
||||
&storage_url,
|
||||
thread_id,
|
||||
&owner,
|
||||
&rollout_path,
|
||||
)
|
||||
.await;
|
||||
let cell = match result {
|
||||
Ok(result) => {
|
||||
let lines = share_success_lines(result.remote_id, scope, &emails);
|
||||
PlainHistoryCell::new(lines)
|
||||
}
|
||||
Err(err) => {
|
||||
history_cell::new_error_event(format!("Failed to share session: {err}"))
|
||||
}
|
||||
};
|
||||
app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(cell)));
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn handle_paste(&mut self, text: String) {
|
||||
self.bottom_pane.handle_paste(text);
|
||||
}
|
||||
|
||||
@@ -32,8 +32,11 @@ use codex_core::format_exec_policy_error_with_source;
|
||||
use codex_core::path_utils;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::read_session_meta_line;
|
||||
use codex_core::session_share::download_rollout_if_available;
|
||||
use codex_core::session_share::local_share_owner;
|
||||
use codex_core::terminal::Multiplexer;
|
||||
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::AltScreenMode;
|
||||
use codex_protocol::config_types::SandboxMode;
|
||||
use codex_protocol::config_types::WindowsSandboxLevel;
|
||||
@@ -49,6 +52,7 @@ use std::fs::OpenOptions;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tracing::error;
|
||||
use tracing::warn;
|
||||
use tracing_appender::non_blocking;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use tracing_subscriber::prelude::*;
|
||||
@@ -544,8 +548,8 @@ async fn run_ratatui_app(
|
||||
} else {
|
||||
initial_config
|
||||
};
|
||||
let mut missing_session_exit = |id_str: &str, action: &str| {
|
||||
error!("Error finding conversation path: {id_str}");
|
||||
let fatal_exit = |tui: &mut Tui, message: String| {
|
||||
error!("{message}");
|
||||
restore();
|
||||
session_log::log_session_end();
|
||||
let _ = tui.terminal.clear();
|
||||
@@ -554,13 +558,20 @@ async fn run_ratatui_app(
|
||||
thread_id: None,
|
||||
thread_name: None,
|
||||
update_action: None,
|
||||
exit_reason: ExitReason::Fatal(format!(
|
||||
"No saved session found with ID {id_str}. Run `codex {action}` without an ID to choose from existing sessions."
|
||||
)),
|
||||
exit_reason: ExitReason::Fatal(message),
|
||||
})
|
||||
};
|
||||
let missing_session_exit = |tui: &mut Tui, id_str: &str, action: &str| {
|
||||
fatal_exit(
|
||||
tui,
|
||||
format!(
|
||||
"No saved session found with ID {id_str}. Run `codex {action}` without an ID to choose from existing sessions."
|
||||
),
|
||||
)
|
||||
};
|
||||
|
||||
let use_fork = cli.fork_picker || cli.fork_last || cli.fork_session_id.is_some();
|
||||
let mut remote_fork_downloaded = false;
|
||||
let session_selection = if use_fork {
|
||||
if let Some(id_str) = cli.fork_session_id.as_deref() {
|
||||
let is_uuid = Uuid::parse_str(id_str).is_ok();
|
||||
@@ -571,7 +582,40 @@ async fn run_ratatui_app(
|
||||
};
|
||||
match path {
|
||||
Some(path) => resume_picker::SessionSelection::Fork(path),
|
||||
None => return missing_session_exit(id_str, "fork"),
|
||||
None => {
|
||||
let storage_url = match config.resolve_session_object_storage_url().await {
|
||||
Ok(Some(url)) => url,
|
||||
Ok(None) => return missing_session_exit(&mut tui, id_str, "fork"),
|
||||
Err(err) => {
|
||||
return fatal_exit(
|
||||
&mut tui,
|
||||
format!("Failed to resolve session object storage URL: {err}"),
|
||||
);
|
||||
}
|
||||
};
|
||||
let Ok(session_id) = ThreadId::from_string(id_str) else {
|
||||
return missing_session_exit(&mut tui, id_str, "fork");
|
||||
};
|
||||
match download_rollout_if_available(
|
||||
&storage_url,
|
||||
session_id,
|
||||
&config.codex_home,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(path)) => {
|
||||
remote_fork_downloaded = true;
|
||||
resume_picker::SessionSelection::Fork(path)
|
||||
}
|
||||
Ok(None) => return missing_session_exit(&mut tui, id_str, "fork"),
|
||||
Err(err) => {
|
||||
return fatal_exit(
|
||||
&mut tui,
|
||||
format!("Failed to fetch remote session {id_str}: {err}"),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if cli.fork_last {
|
||||
let provider_filter = vec![config.model_provider_id.clone()];
|
||||
@@ -620,7 +664,7 @@ async fn run_ratatui_app(
|
||||
};
|
||||
match path {
|
||||
Some(path) => resume_picker::SessionSelection::Resume(path),
|
||||
None => return missing_session_exit(id_str, "resume"),
|
||||
None => return missing_session_exit(&mut tui, id_str, "resume"),
|
||||
}
|
||||
} else if cli.resume_last {
|
||||
let provider_filter = vec![config.model_provider_id.clone()];
|
||||
@@ -663,6 +707,43 @@ async fn run_ratatui_app(
|
||||
resume_picker::SessionSelection::StartFresh
|
||||
};
|
||||
|
||||
let current_owner = auth_manager
|
||||
.auth_cached()
|
||||
.and_then(|auth| auth.get_account_email());
|
||||
|
||||
if let resume_picker::SessionSelection::Resume(path) = &session_selection {
|
||||
match local_share_owner(path) {
|
||||
Ok(Some(owner)) => {
|
||||
if current_owner.as_deref() != Some(owner.as_str()) {
|
||||
let session_id = read_session_meta_line(path)
|
||||
.await
|
||||
.ok()
|
||||
.map(|meta| meta.meta.id.to_string());
|
||||
let (id_display, fork_hint) = match session_id {
|
||||
Some(id) => (id.clone(), format!("Use `codex fork {id}` instead.")),
|
||||
None => (
|
||||
"this session".to_string(),
|
||||
"Use `codex fork` to select it instead.".to_string(),
|
||||
),
|
||||
};
|
||||
return fatal_exit(
|
||||
&mut tui,
|
||||
format!(
|
||||
"Cannot resume shared session {id_display} owned by {owner}. {fork_hint}"
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
error = %err,
|
||||
"failed to read shared session metadata; treating session as unshared"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let current_cwd = config.cwd.clone();
|
||||
let allow_prompt = cli.cwd.is_none();
|
||||
let action_and_path_if_resume_or_fork = match &session_selection {
|
||||
@@ -672,8 +753,25 @@ async fn run_ratatui_app(
|
||||
};
|
||||
let fallback_cwd = match action_and_path_if_resume_or_fork {
|
||||
Some((action, path)) => {
|
||||
resolve_cwd_for_resume_or_fork(&mut tui, ¤t_cwd, path, action, allow_prompt)
|
||||
.await?
|
||||
let shared_owner = match action {
|
||||
CwdPromptAction::Fork => local_share_owner(path).ok().flatten(),
|
||||
CwdPromptAction::Resume => None,
|
||||
};
|
||||
let shared_by_other = shared_owner
|
||||
.as_deref()
|
||||
.map(|owner| current_owner.as_deref() != Some(owner))
|
||||
.unwrap_or(false);
|
||||
let prefer_current_cwd =
|
||||
(remote_fork_downloaded || shared_by_other) && action == CwdPromptAction::Fork;
|
||||
resolve_cwd_for_resume_or_fork(
|
||||
&mut tui,
|
||||
¤t_cwd,
|
||||
path,
|
||||
action,
|
||||
allow_prompt,
|
||||
prefer_current_cwd,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
@@ -786,10 +884,17 @@ pub(crate) async fn resolve_cwd_for_resume_or_fork(
|
||||
path: &Path,
|
||||
action: CwdPromptAction,
|
||||
allow_prompt: bool,
|
||||
prefer_current_cwd: bool,
|
||||
) -> color_eyre::Result<Option<PathBuf>> {
|
||||
if prefer_current_cwd {
|
||||
return Ok(Some(current_cwd.to_path_buf()));
|
||||
}
|
||||
let Some(history_cwd) = read_session_cwd(path).await else {
|
||||
return Ok(None);
|
||||
};
|
||||
if action == CwdPromptAction::Fork && !history_cwd.exists() {
|
||||
return Ok(Some(current_cwd.to_path_buf()));
|
||||
}
|
||||
if allow_prompt && cwds_differ(current_cwd, &history_cwd) {
|
||||
let selection =
|
||||
cwd_prompt::run_cwd_selection_prompt(tui, action, current_cwd, &history_cwd).await?;
|
||||
|
||||
@@ -26,6 +26,7 @@ pub enum SlashCommand {
|
||||
New,
|
||||
Resume,
|
||||
Fork,
|
||||
Share,
|
||||
Init,
|
||||
Compact,
|
||||
Plan,
|
||||
@@ -67,6 +68,7 @@ impl SlashCommand {
|
||||
SlashCommand::Rename => "rename the current thread",
|
||||
SlashCommand::Resume => "resume a saved chat",
|
||||
SlashCommand::Fork => "fork the current chat",
|
||||
SlashCommand::Share => "share the current chat via object storage",
|
||||
// SlashCommand::Undo => "ask Codex to undo a turn",
|
||||
SlashCommand::Quit | SlashCommand::Exit => "exit Codex",
|
||||
SlashCommand::Diff => "show git diff (including untracked files)",
|
||||
@@ -122,6 +124,7 @@ impl SlashCommand {
|
||||
SlashCommand::New
|
||||
| SlashCommand::Resume
|
||||
| SlashCommand::Fork
|
||||
| SlashCommand::Share
|
||||
| SlashCommand::Init
|
||||
| SlashCommand::Compact
|
||||
// | SlashCommand::Undo
|
||||
|
||||
@@ -8,6 +8,6 @@ license.workspace = true
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
assert_cmd = { workspace = true }
|
||||
path-absolutize = { workspace = true }
|
||||
runfiles = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
use path_absolutize::Absolutize;
|
||||
use std::env;
|
||||
use std::ffi;
|
||||
use std::ffi::OsString;
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
@@ -43,27 +46,13 @@ pub fn cargo_bin(name: &str) -> Result<PathBuf, CargoBinError> {
|
||||
return resolve_bin_from_env(key, value);
|
||||
}
|
||||
}
|
||||
match assert_cmd::Command::cargo_bin(name) {
|
||||
Ok(cmd) => {
|
||||
let mut path = PathBuf::from(cmd.get_program());
|
||||
if !path.is_absolute() {
|
||||
path = std::env::current_dir()
|
||||
.map_err(|source| CargoBinError::CurrentDir { source })?
|
||||
.join(path);
|
||||
}
|
||||
if path.exists() {
|
||||
Ok(path)
|
||||
} else {
|
||||
Err(CargoBinError::ResolvedPathDoesNotExist {
|
||||
key: "assert_cmd::Command::cargo_bin".to_owned(),
|
||||
path,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
match resolve_bin_from_target_dir(name) {
|
||||
Ok(path) => Ok(path),
|
||||
Err(err) => Err(CargoBinError::NotFound {
|
||||
name: name.to_owned(),
|
||||
env_keys,
|
||||
fallback: format!("assert_cmd fallback failed: {err}"),
|
||||
fallback: format!("target dir fallback failed: {err}"),
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -106,6 +95,41 @@ fn resolve_bin_from_env(key: &str, value: OsString) -> Result<PathBuf, CargoBinE
|
||||
})
|
||||
}
|
||||
|
||||
fn resolve_bin_from_target_dir(name: &str) -> Result<PathBuf, CargoBinError> {
|
||||
let target_dir = cargo_target_dir()?;
|
||||
let filename = format!("{name}{}", env::consts::EXE_SUFFIX);
|
||||
let candidate = target_dir.join(filename);
|
||||
let abs = absolutize_from_buck_or_cwd(candidate.clone())?;
|
||||
if abs.exists() {
|
||||
Ok(abs)
|
||||
} else {
|
||||
Err(CargoBinError::ResolvedPathDoesNotExist {
|
||||
key: "target_dir".to_owned(),
|
||||
path: candidate,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn cargo_target_dir() -> Result<PathBuf, CargoBinError> {
|
||||
let current_exe = env::current_exe().map_err(|source| CargoBinError::CurrentExe { source })?;
|
||||
let mut path = current_exe.parent().map(PathBuf::from).ok_or_else(|| {
|
||||
CargoBinError::ResolvedPathDoesNotExist {
|
||||
key: "current_exe".to_owned(),
|
||||
path: current_exe.clone(),
|
||||
}
|
||||
})?;
|
||||
if path.ends_with(ffi::OsStr::new("deps")) {
|
||||
path.pop();
|
||||
}
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
fn absolutize_from_buck_or_cwd(path: PathBuf) -> Result<PathBuf, CargoBinError> {
|
||||
path.absolutize()
|
||||
.map(std::borrow::Cow::into_owned)
|
||||
.map_err(|source| CargoBinError::CurrentDir { source })
|
||||
}
|
||||
|
||||
/// Macro that derives the path to a test resource at runtime, the value of
|
||||
/// which depends on whether Cargo or Bazel is being used to build and run a
|
||||
/// test. Note the return value may be a relative or absolute path.
|
||||
|
||||
@@ -24,6 +24,40 @@ Codex can run a notification hook when the agent finishes a turn. See the config
|
||||
|
||||
- https://developers.openai.com/codex/config-reference
|
||||
|
||||
## Session sharing (enterprise)
|
||||
|
||||
To enable `/share` for enterprise or self-hosted storage, configure:
|
||||
|
||||
```
|
||||
session_object_storage_url = "https://your-object-store.example.com/codex-sessions/"
|
||||
```
|
||||
|
||||
If you need a dynamic URL (for example, a short-lived token), you can provide a command
|
||||
that prints the URL to stdout:
|
||||
|
||||
```
|
||||
session_object_storage_url_cmd = ["bash", "-lc", "get-session-store-url"]
|
||||
```
|
||||
|
||||
If both `session_object_storage_url` and `session_object_storage_url_cmd` are set, the
|
||||
static URL wins.
|
||||
|
||||
You can also use Azure Blob Storage with a SAS URL, either as a standard HTTPS URL or the shorthand `az://` form:
|
||||
|
||||
```
|
||||
session_object_storage_url = "https://<account>.blob.core.windows.net/<container>/codex-sessions?<sas>"
|
||||
```
|
||||
|
||||
```
|
||||
session_object_storage_url = "az://<account>/<container>/codex-sessions?<sas>"
|
||||
```
|
||||
|
||||
For Azure, the SAS token must allow read and write access to blob objects under the prefix. Listing is not required.
|
||||
|
||||
If you omit the SAS token, Codex will try Azure CLI authentication (`az login`) and request storage scope tokens for the blob API.
|
||||
|
||||
For HTTP/HTTPS URLs, the endpoint should support `HEAD`/`PUT` for individual objects. Codex will upload the session rollout (`.jsonl`) under this prefix when sharing.
|
||||
|
||||
## JSON Schema
|
||||
|
||||
The generated JSON Schema for `config.toml` lives at `codex-rs/core/config.schema.json`.
|
||||
|
||||
Reference in New Issue
Block a user