Compare commits

...

29 Commits

Author SHA1 Message Date
Charles Cunningham
18a35e1d50 Simplify storage URL timeout handling 2026-02-17 10:53:10 -08:00
Charles Cunningham
06862a36ea Treat unreadable share metadata as unshared 2026-02-17 10:51:32 -08:00
Charles Cunningham
f022b4dcdd Fallback to UTC for rollout download timestamps 2026-02-17 10:49:35 -08:00
Charles Cunningham
de87a5a7ad Fix async storage URL resolution return 2026-02-17 09:24:08 -08:00
Charles Cunningham
4525c785d0 Remove downloaded rollout on metadata failure 2026-02-16 23:42:02 -08:00
Charles Cunningham
9df71691d2 Resolve session storage URL async with timeout 2026-02-16 23:35:16 -08:00
Charles Cunningham
12f7ffdb66 Make share metadata creation atomic 2026-02-16 23:14:53 -08:00
Charles Cunningham
27b5e7c4be Re-resolve session storage URL commands 2026-02-16 17:42:23 -08:00
Charles Cunningham
21db430ab3 Require rollout file for sharing 2026-02-16 17:33:59 -08:00
Charles Cunningham
4db264bb7d Stream session share rollouts 2026-02-16 17:31:17 -08:00
Charles Cunningham
37a68eeb05 Use io::Error::other for command failures 2026-02-16 13:07:44 -08:00
Charles Cunningham
e2e83dce09 Block in-app resume for shared sessions 2026-02-16 13:07:44 -08:00
Charles Cunningham
68730de96d Reject overwrite when share metadata missing 2026-02-16 13:07:44 -08:00
Charles Cunningham
726369eaa6 Add command hook for session object store URL 2026-02-16 13:07:44 -08:00
Charles Cunningham
9c6cc81ef4 Fix share prompt fields after rebase 2026-02-16 13:07:44 -08:00
Charles Cunningham
ced5d77ad2 Skip cwd prompt for shared forks 2026-02-16 13:07:43 -08:00
Charles Cunningham
05cae8a1ec Fix remote fork cwd + enforce shared ownership 2026-02-16 13:07:43 -08:00
Charles Cunningham
ffa7644f80 lint 2026-02-16 13:07:43 -08:00
Charles Cunningham
8d542f890f lint 2026-02-16 13:07:43 -08:00
Charles Cunningham
23855a07f3 Check metadata even when the rollout blob is missing, and enforce ownership in that case 2026-02-16 13:07:43 -08:00
Charles Cunningham
99f190bf91 Prefer current cwd if remote downloaded 2026-02-16 13:07:43 -08:00
Charles Cunningham
0c2c47aa29 Preserve query params when building HTTP object URLs 2026-02-16 13:07:43 -08:00
Charles Cunningham
0f3c89d02b Fix /share user input handling after rebase 2026-02-16 13:07:43 -08:00
Charles Cunningham
cd265a3093 Fix cargo deny 2026-02-16 13:07:43 -08:00
Charles Cunningham
275a6b7300 Remove AWS support for now to make CI happy 2026-02-16 13:07:43 -08:00
Charles Cunningham
45ce36de1f Lint and fix 2026-02-16 13:07:43 -08:00
Charles Cunningham
16cfee1935 Fork remotely stored session 2026-02-16 13:07:43 -08:00
Charles Cunningham
f4c955af57 Lint 2026-02-16 13:07:43 -08:00
Charles Cunningham
4e199e6aeb Enterprise session sharing via object storage (/share) 2026-02-16 13:07:43 -08:00
16 changed files with 2324 additions and 416 deletions

176
MODULE.bazel.lock generated

File diff suppressed because one or more lines are too long

1037
codex-rs/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -152,6 +152,8 @@ async-channel = "2.3.1"
async-stream = "0.3.6"
async-trait = "0.1.89"
axum = { version = "0.8", default-features = false }
azure_core = "0.21"
azure_identity = "0.21"
base64 = "0.22.1"
bm25 = "2.3.2"
bytes = "1.10.1"

View File

@@ -23,6 +23,8 @@ arc-swap = "1.8.0"
async-channel = { workspace = true }
async-trait = { workspace = true }
askama = { workspace = true }
azure_core = { workspace = true }
azure_identity = { workspace = true }
base64 = { workspace = true }
bm25 = { workspace = true }
chardetng = { workspace = true }
@@ -102,7 +104,7 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
"signal",
] }
tokio-util = { workspace = true, features = ["rt"] }
tokio-util = { workspace = true, features = ["io", "rt"] }
tokio-tungstenite = { workspace = true }
toml = { workspace = true }
toml_edit = { workspace = true }

View File

@@ -1685,6 +1685,17 @@
],
"description": "Sandbox configuration to apply if `sandbox` is `WorkspaceWrite`."
},
"session_object_storage_url": {
"description": "Base URL for the object store used by /share (enterprise/self-hosted).",
"type": "string"
},
"session_object_storage_url_cmd": {
"description": "Optional command to produce the base URL for the object store used by /share. The command runs as argv and must print a non-empty URL to stdout.",
"items": {
"type": "string"
},
"type": "array"
},
"shell_environment_policy": {
"allOf": [
{

View File

@@ -78,6 +78,8 @@ use std::collections::HashMap;
use std::io::ErrorKind;
use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use std::time::Duration;
#[cfg(test)]
use tempfile::tempdir;
#[cfg(not(target_os = "macos"))]
@@ -109,6 +111,7 @@ pub use codex_git::GhostSnapshotConfig;
/// the context window.
pub(crate) const PROJECT_DOC_MAX_BYTES: usize = 32 * 1024; // 32 KiB
pub(crate) const DEFAULT_AGENT_MAX_THREADS: Option<usize> = Some(6);
const SESSION_OBJECT_STORAGE_URL_CMD_TIMEOUT: Duration = Duration::from_secs(5);
pub const CONFIG_TOML_FILE: &str = "config.toml";
@@ -346,6 +349,11 @@ pub struct Config {
/// Base URL for requests to ChatGPT (as opposed to the OpenAI API).
pub chatgpt_base_url: String,
/// Optional base URL for storing shared session rollouts in an object store.
pub session_object_storage_url: Option<String>,
/// Optional command to produce the base URL for storing shared session rollouts.
pub session_object_storage_url_cmd: Option<Vec<String>>,
/// When set, restricts ChatGPT login to a specific workspace identifier.
pub forced_chatgpt_workspace_id: Option<String>,
@@ -947,6 +955,12 @@ pub struct ConfigToml {
/// When unset, Codex will bind to an ephemeral port chosen by the OS.
pub mcp_oauth_callback_port: Option<u16>,
/// Base URL for the object store used by /share (enterprise/self-hosted).
pub session_object_storage_url: Option<String>,
/// Optional command to produce the base URL for the object store used by /share.
/// The command runs as argv and must print a non-empty URL to stdout.
pub session_object_storage_url_cmd: Option<Vec<String>>,
/// User-defined provider entries that extend/override the built-in list.
#[serde(default)]
pub model_providers: HashMap<String, ModelProviderInfo>,
@@ -1508,6 +1522,15 @@ impl Config {
Some(WindowsSandboxModeToml::Unelevated) => WindowsSandboxLevel::RestrictedToken,
None => WindowsSandboxLevel::from_features(&features),
};
let session_object_storage_url =
cfg.session_object_storage_url.as_ref().and_then(|value| {
let trimmed = value.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
});
let mut sandbox_policy = cfg.derive_sandbox_policy(
sandbox_mode,
config_profile.sandbox_mode,
@@ -1815,6 +1838,8 @@ impl Config {
.chatgpt_base_url
.or(cfg.chatgpt_base_url)
.unwrap_or("https://chatgpt.com/backend-api/".to_string()),
session_object_storage_url,
session_object_storage_url_cmd: cfg.session_object_storage_url_cmd,
forced_chatgpt_workspace_id,
forced_login_method,
include_apply_patch_tool: include_apply_patch_tool_flag,
@@ -1881,6 +1906,35 @@ impl Config {
Ok(config)
}
pub async fn resolve_session_object_storage_url(&self) -> std::io::Result<Option<String>> {
if let Some(url) = self.session_object_storage_url.as_ref() {
return Ok(Some(url.clone()));
}
let Some(command) = self.session_object_storage_url_cmd.clone() else {
return Ok(None);
};
let mut handle = tokio::task::spawn_blocking(move || {
Self::try_run_command_for_value(Some(&command), "session_object_storage_url_cmd")
});
match tokio::time::timeout(SESSION_OBJECT_STORAGE_URL_CMD_TIMEOUT, &mut handle).await {
Ok(result) => result.map_err(|err| {
std::io::Error::other(format!("session_object_storage_url_cmd panicked: {err}"))
})?,
Err(_) => {
handle.abort();
Err(std::io::Error::new(
ErrorKind::TimedOut,
format!(
"session_object_storage_url_cmd timed out after {}s",
SESSION_OBJECT_STORAGE_URL_CMD_TIMEOUT.as_secs()
),
))
}
}
}
fn load_instructions(codex_dir: Option<&Path>) -> Option<String> {
let base = codex_dir?;
for candidate in [LOCAL_PROJECT_DOC_FILENAME, DEFAULT_PROJECT_DOC_FILENAME] {
@@ -1925,6 +1979,59 @@ impl Config {
}
}
fn try_run_command_for_value(
command: Option<&[String]>,
context: &str,
) -> std::io::Result<Option<String>> {
let Some(command) = command else {
return Ok(None);
};
if command.is_empty() {
return Err(std::io::Error::new(
ErrorKind::InvalidInput,
format!("{context} is empty"),
));
}
let mut cmd = Command::new(&command[0]);
if let Some(args) = command.get(1..) {
cmd.args(args);
}
let output = cmd.output().map_err(|err| {
std::io::Error::new(err.kind(), format!("{context} failed to start: {err}"))
})?;
if !output.status.success() {
let status = output.status;
let stderr = String::from_utf8_lossy(&output.stderr);
let stderr = stderr.trim();
let message = if stderr.is_empty() {
format!("{context} failed with status {status}")
} else {
format!("{context} failed with status {status}: {stderr}")
};
return Err(std::io::Error::other(message));
}
let stdout = String::from_utf8(output.stdout).map_err(|err| {
std::io::Error::new(
ErrorKind::InvalidData,
format!("{context} output was not UTF-8: {err}"),
)
})?;
let trimmed = stdout.trim();
if trimmed.is_empty() {
Err(std::io::Error::new(
ErrorKind::InvalidData,
format!("{context} returned empty output"),
))
} else {
Ok(Some(trimmed.to_string()))
}
}
pub fn set_windows_sandbox_enabled(&mut self, value: bool) {
self.permissions.windows_sandbox_mode = if value {
Some(WindowsSandboxModeToml::Unelevated)
@@ -4133,6 +4240,8 @@ model_verbosity = "high"
model_verbosity: None,
personality: Some(Personality::Pragmatic),
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
session_object_storage_url: None,
session_object_storage_url_cmd: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
@@ -4244,6 +4353,8 @@ model_verbosity = "high"
model_verbosity: None,
personality: Some(Personality::Pragmatic),
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
session_object_storage_url: None,
session_object_storage_url_cmd: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
@@ -4353,6 +4464,8 @@ model_verbosity = "high"
model_verbosity: None,
personality: Some(Personality::Pragmatic),
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
session_object_storage_url: None,
session_object_storage_url_cmd: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
@@ -4448,6 +4561,8 @@ model_verbosity = "high"
model_verbosity: Some(Verbosity::High),
personality: Some(Personality::Pragmatic),
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
session_object_storage_url: None,
session_object_storage_url_cmd: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,

View File

@@ -57,6 +57,7 @@ mod proposed_plan_parser;
mod sandbox_tags;
pub mod sandboxing;
mod session_prefix;
pub mod session_share;
mod shell_detect;
mod stream_events_utils;
mod tagged_block_parser;

View File

@@ -0,0 +1,882 @@
use std::ffi::OsStr;
use std::ffi::OsString;
use std::io;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Context;
use azure_core::auth::TokenCredential;
use azure_identity::AzureCliCredential;
use codex_protocol::ThreadId;
use futures::StreamExt;
use reqwest::StatusCode;
use reqwest::Url;
use serde::Deserialize;
use serde::Serialize;
use time::OffsetDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
use tokio::io::AsyncWriteExt;
use tokio_util::io::ReaderStream;
const SHARE_OBJECT_PREFIX: &str = "sessions";
const SHARE_OBJECT_SUFFIX: &str = ".jsonl";
const SHARE_META_SUFFIX: &str = ".meta.json";
const AZURE_STORAGE_SCOPE: &str = "https://storage.azure.com/.default";
#[derive(Debug, Clone)]
pub struct SessionShareResult {
pub remote_id: ThreadId,
pub object_url: Url,
}
#[derive(Debug, Clone)]
enum SessionObjectStore {
Http(HttpObjectStore),
Azure(AzureObjectStore),
}
#[derive(Debug, Clone)]
struct HttpObjectStore {
base_url: Url,
client: reqwest::Client,
}
#[derive(Debug, Clone)]
struct AzureObjectStore {
endpoint: Url,
container: String,
prefix: String,
sas_query: Option<String>,
client: reqwest::Client,
credential: Option<Arc<dyn TokenCredential>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SessionShareMeta {
owner: String,
created_at: i64,
updated_at: i64,
}
impl SessionObjectStore {
pub async fn new(base_url: &str) -> anyhow::Result<Self> {
let mut url = Url::parse(base_url)
.with_context(|| format!("invalid session_object_storage_url: {base_url}"))?;
match url.scheme() {
"az" => Ok(SessionObjectStore::Azure(AzureObjectStore::new_from_az(
&url,
)?)),
"http" | "https" => {
if is_azure_blob_url(&url) {
Ok(SessionObjectStore::Azure(AzureObjectStore::new(&url)?))
} else {
ensure_trailing_slash(&mut url);
Ok(SessionObjectStore::Http(HttpObjectStore {
base_url: url,
client: reqwest::Client::new(),
}))
}
}
other => Err(anyhow::anyhow!(
"unsupported session_object_storage_url scheme {other}"
)),
}
}
fn object_url(&self, key: &str) -> anyhow::Result<Url> {
match self {
SessionObjectStore::Http(store) => store.object_url(key),
SessionObjectStore::Azure(store) => store.object_url(key),
}
}
async fn object_exists(&self, key: &str) -> anyhow::Result<bool> {
match self {
SessionObjectStore::Http(store) => store.object_exists(key).await,
SessionObjectStore::Azure(store) => store.object_exists(key).await,
}
}
async fn put_object(&self, key: &str, data: Vec<u8>, content_type: &str) -> anyhow::Result<()> {
match self {
SessionObjectStore::Http(store) => store.put_object(key, data, content_type).await,
SessionObjectStore::Azure(store) => store.put_object(key, data, content_type).await,
}
}
async fn put_object_if_absent(
&self,
key: &str,
data: Vec<u8>,
content_type: &str,
) -> anyhow::Result<bool> {
match self {
SessionObjectStore::Http(store) => {
store.put_object_if_absent(key, data, content_type).await
}
SessionObjectStore::Azure(store) => {
store.put_object_if_absent(key, data, content_type).await
}
}
}
async fn put_object_file(
&self,
key: &str,
path: &Path,
content_type: &str,
) -> anyhow::Result<()> {
match self {
SessionObjectStore::Http(store) => store.put_object_file(key, path, content_type).await,
SessionObjectStore::Azure(store) => {
store.put_object_file(key, path, content_type).await
}
}
}
async fn get_object_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
match self {
SessionObjectStore::Http(store) => store.get_object_bytes(key).await,
SessionObjectStore::Azure(store) => store.get_object_bytes(key).await,
}
}
async fn get_object_to_file(&self, key: &str, path: &Path) -> anyhow::Result<bool> {
match self {
SessionObjectStore::Http(store) => store.get_object_to_file(key, path).await,
SessionObjectStore::Azure(store) => store.get_object_to_file(key, path).await,
}
}
}
pub async fn upload_rollout_with_owner(
base_url: &str,
session_id: ThreadId,
owner: &str,
rollout_path: &Path,
) -> anyhow::Result<SessionShareResult> {
let store = SessionObjectStore::new(base_url).await?;
let key = object_key(session_id);
let meta_key = meta_key(session_id);
let rollout_exists = store.object_exists(&key).await?;
let now = OffsetDateTime::now_utc().unix_timestamp();
let meta = fetch_meta(&store, &meta_key).await?;
match (rollout_exists, meta) {
(true, Some(meta)) => {
if meta.owner != owner {
return Err(anyhow::anyhow!(
"remote session already exists and belongs to another user"
));
}
store
.put_object_file(&key, rollout_path, "application/x-ndjson")
.await
.with_context(|| format!("failed to upload rollout for id {session_id}"))?;
let updated = SessionShareMeta {
owner: meta.owner,
created_at: meta.created_at,
updated_at: now,
};
upload_meta(&store, &meta_key, &updated).await?;
}
(true, None) => {
return Err(anyhow::anyhow!(
"remote session already exists but has no metadata; refusing to overwrite"
));
}
(false, Some(meta)) => {
if meta.owner != owner {
return Err(anyhow::anyhow!(
"remote session metadata already exists and belongs to another user"
));
}
store
.put_object_file(&key, rollout_path, "application/x-ndjson")
.await
.with_context(|| format!("failed to upload rollout for id {session_id}"))?;
let updated = SessionShareMeta {
owner: meta.owner,
created_at: meta.created_at,
updated_at: now,
};
upload_meta(&store, &meta_key, &updated).await?;
}
(false, None) => {
let meta = SessionShareMeta {
owner: owner.to_string(),
created_at: now,
updated_at: now,
};
let created = create_meta_if_absent(&store, &meta_key, &meta).await?;
if created {
store
.put_object_file(&key, rollout_path, "application/x-ndjson")
.await
.with_context(|| format!("failed to upload rollout for id {session_id}"))?;
} else {
let meta = fetch_meta(&store, &meta_key).await?.ok_or_else(|| {
anyhow::anyhow!("failed to atomically create share metadata; try again")
})?;
if meta.owner != owner {
return Err(anyhow::anyhow!(
"remote session metadata already exists and belongs to another user"
));
}
store
.put_object_file(&key, rollout_path, "application/x-ndjson")
.await
.with_context(|| format!("failed to upload rollout for id {session_id}"))?;
let updated = SessionShareMeta {
owner: meta.owner,
created_at: meta.created_at,
updated_at: now,
};
upload_meta(&store, &meta_key, &updated).await?;
}
}
}
let object_url = store.object_url(&key)?;
Ok(SessionShareResult {
remote_id: session_id,
object_url,
})
}
pub async fn download_rollout_if_available(
base_url: &str,
session_id: ThreadId,
codex_home: &Path,
) -> anyhow::Result<Option<PathBuf>> {
let store = SessionObjectStore::new(base_url).await?;
let key = object_key(session_id);
let meta_key = meta_key(session_id);
let path = build_rollout_download_path(codex_home, session_id)?;
if !store.get_object_to_file(&key, &path).await? {
return Ok(None);
}
let meta_path = share_meta_path_for_rollout_path(&path);
let meta = match fetch_meta(&store, &meta_key).await {
Ok(meta) => meta,
Err(err) => {
cleanup_downloaded_rollout(&path, &meta_path).await;
return Err(err);
}
};
match meta {
Some(meta) => {
let payload =
serde_json::to_vec(&meta).with_context(|| "failed to serialize metadata")?;
if let Err(err) = tokio::fs::write(&meta_path, payload)
.await
.with_context(|| format!("failed to write share metadata {}", meta_path.display()))
{
cleanup_downloaded_rollout(&path, &meta_path).await;
return Err(err);
}
}
None => {
let _ = tokio::fs::remove_file(&meta_path).await;
}
}
Ok(Some(path))
}
fn object_key(id: ThreadId) -> String {
format!("{SHARE_OBJECT_PREFIX}/{id}{SHARE_OBJECT_SUFFIX}")
}
fn meta_key(id: ThreadId) -> String {
format!("{SHARE_OBJECT_PREFIX}/{id}{SHARE_META_SUFFIX}")
}
pub fn local_share_owner(rollout_path: &Path) -> anyhow::Result<Option<String>> {
let meta_path = share_meta_path_for_rollout_path(rollout_path);
let bytes = match std::fs::read(&meta_path) {
Ok(bytes) => bytes,
Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(None),
Err(err) => {
return Err(err)
.with_context(|| format!("failed to read share metadata {}", meta_path.display()));
}
};
let meta: SessionShareMeta =
serde_json::from_slice(&bytes).with_context(|| "failed to parse session share metadata")?;
Ok(Some(meta.owner))
}
async fn fetch_meta(
store: &SessionObjectStore,
key: &str,
) -> anyhow::Result<Option<SessionShareMeta>> {
let Some(bytes) = store.get_object_bytes(key).await? else {
return Ok(None);
};
let meta: SessionShareMeta =
serde_json::from_slice(&bytes).with_context(|| "failed to parse session share metadata")?;
Ok(Some(meta))
}
async fn upload_meta(
store: &SessionObjectStore,
key: &str,
meta: &SessionShareMeta,
) -> anyhow::Result<()> {
let payload = serde_json::to_vec(meta).with_context(|| "failed to serialize metadata")?;
store.put_object(key, payload, "application/json").await?;
Ok(())
}
async fn create_meta_if_absent(
store: &SessionObjectStore,
key: &str,
meta: &SessionShareMeta,
) -> anyhow::Result<bool> {
let payload = serde_json::to_vec(meta).with_context(|| "failed to serialize metadata")?;
store
.put_object_if_absent(key, payload, "application/json")
.await
}
fn build_rollout_download_path(codex_home: &Path, session_id: ThreadId) -> anyhow::Result<PathBuf> {
let timestamp = OffsetDateTime::now_local().unwrap_or_else(|_| OffsetDateTime::now_utc());
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let date_str = timestamp
.format(format)
.map_err(|e| anyhow::anyhow!("failed to format timestamp: {e}"))?;
let mut dir = codex_home.to_path_buf();
dir.push(crate::rollout::SESSIONS_SUBDIR);
dir.push(timestamp.year().to_string());
dir.push(format!("{:02}", u8::from(timestamp.month())));
dir.push(format!("{:02}", timestamp.day()));
let filename = format!("rollout-{date_str}-{session_id}.jsonl");
Ok(dir.join(filename))
}
fn share_meta_path_for_rollout_path(path: &Path) -> PathBuf {
let file_name = path.file_name().unwrap_or_else(|| OsStr::new("session"));
let mut name = OsString::from(file_name);
name.push(".share-meta.json");
path.with_file_name(name)
}
impl HttpObjectStore {
fn object_url(&self, key: &str) -> anyhow::Result<Url> {
let mut base = self.base_url.clone();
let query = base.query().map(str::to_string);
base.set_query(None);
let mut joined = base
.join(key)
.with_context(|| format!("failed to build object URL for key {key}"))?;
if let Some(query) = query {
joined.set_query(Some(&query));
}
Ok(joined)
}
async fn object_exists(&self, key: &str) -> anyhow::Result<bool> {
let url = self.object_url(key)?;
let response = self.client.head(url).send().await?;
match response.status() {
StatusCode::NOT_FOUND => Ok(false),
StatusCode::METHOD_NOT_ALLOWED | StatusCode::NOT_IMPLEMENTED => {
self.object_exists_via_get(key).await
}
status if status.is_success() => Ok(true),
status => Err(anyhow::anyhow!(
"object store HEAD failed with status {status}"
)),
}
}
async fn object_exists_via_get(&self, key: &str) -> anyhow::Result<bool> {
let url = self.object_url(key)?;
let response = self
.client
.get(url)
.header(reqwest::header::RANGE, "bytes=0-0")
.send()
.await?;
match response.status() {
StatusCode::NOT_FOUND => Ok(false),
StatusCode::PARTIAL_CONTENT | StatusCode::OK => Ok(true),
status => Err(anyhow::anyhow!(
"object store GET probe failed with status {status}"
)),
}
}
async fn put_object(&self, key: &str, data: Vec<u8>, content_type: &str) -> anyhow::Result<()> {
let url = self.object_url(key)?;
let response = self
.client
.put(url)
.header(reqwest::header::CONTENT_TYPE, content_type)
.body(data)
.send()
.await?;
if response.status().is_success() {
Ok(())
} else {
Err(anyhow::anyhow!(
"object store PUT failed with status {}",
response.status()
))
}
}
async fn put_object_if_absent(
&self,
key: &str,
data: Vec<u8>,
content_type: &str,
) -> anyhow::Result<bool> {
let url = self.object_url(key)?;
let response = self
.client
.put(url)
.header(reqwest::header::IF_NONE_MATCH, "*")
.header(reqwest::header::CONTENT_TYPE, content_type)
.body(data)
.send()
.await?;
match response.status() {
status if status.is_success() => Ok(true),
StatusCode::PRECONDITION_FAILED | StatusCode::CONFLICT => Ok(false),
status => Err(anyhow::anyhow!(
"object store conditional PUT failed with status {status}"
)),
}
}
async fn put_object_file(
&self,
key: &str,
path: &Path,
content_type: &str,
) -> anyhow::Result<()> {
let url = self.object_url(key)?;
let file = tokio::fs::File::open(path)
.await
.with_context(|| format!("failed to open rollout at {}", path.display()))?;
let stream = ReaderStream::new(file);
let response = self
.client
.put(url)
.header(reqwest::header::CONTENT_TYPE, content_type)
.body(reqwest::Body::wrap_stream(stream))
.send()
.await?;
if response.status().is_success() {
Ok(())
} else {
Err(anyhow::anyhow!(
"object store PUT failed with status {}",
response.status()
))
}
}
async fn get_object_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
let url = self.object_url(key)?;
let response = self.client.get(url).send().await?;
match response.status() {
StatusCode::NOT_FOUND => Ok(None),
status if status.is_success() => {
let bytes = response.bytes().await?;
Ok(Some(bytes.to_vec()))
}
status => Err(anyhow::anyhow!(
"object store GET failed with status {status}"
)),
}
}
async fn get_object_to_file(&self, key: &str, path: &Path) -> anyhow::Result<bool> {
let url = self.object_url(key)?;
let response = self.client.get(url).send().await?;
match response.status() {
StatusCode::NOT_FOUND => Ok(false),
status if status.is_success() => {
write_response_to_file(path, response).await?;
Ok(true)
}
status => Err(anyhow::anyhow!(
"object store GET failed with status {status}"
)),
}
}
}
impl AzureObjectStore {
fn new(url: &Url) -> anyhow::Result<Self> {
let endpoint = azure_endpoint(url)?;
let (container, prefix) = azure_container_and_prefix(url)?;
let sas_query = url.query().map(str::to_string);
let credential = if sas_query.is_some() {
None
} else {
let credential: Arc<dyn TokenCredential> = Arc::new(AzureCliCredential::new());
Some(credential)
};
Ok(Self {
endpoint,
container,
prefix,
sas_query,
client: reqwest::Client::new(),
credential,
})
}
fn new_from_az(url: &Url) -> anyhow::Result<Self> {
let account = url
.host_str()
.ok_or_else(|| anyhow::anyhow!("az url missing account name"))?;
let endpoint = azure_endpoint_for_account(account)?;
let (container, prefix) = azure_container_and_prefix(url)?;
let sas_query = url.query().map(str::to_string);
let credential = if sas_query.is_some() {
None
} else {
let credential: Arc<dyn TokenCredential> = Arc::new(AzureCliCredential::new());
Some(credential)
};
Ok(Self {
endpoint,
container,
prefix,
sas_query,
client: reqwest::Client::new(),
credential,
})
}
fn object_url(&self, key: &str) -> anyhow::Result<Url> {
let full_key = join_prefix(&self.prefix, key);
let mut url = self.endpoint.clone();
if full_key.is_empty() {
url.set_path(&format!("/{}", self.container));
} else {
url.set_path(&format!("/{}/{}", self.container, full_key));
}
if let Some(query) = &self.sas_query {
url.set_query(Some(query));
}
Ok(url)
}
async fn object_exists(&self, key: &str) -> anyhow::Result<bool> {
let url = self.object_url(key)?;
let response = self
.authorized_request(self.client.head(url))
.await?
.send()
.await?;
match response.status() {
StatusCode::NOT_FOUND => Ok(false),
status if status.is_success() => Ok(true),
status => Err(anyhow::anyhow!(
"azure blob HEAD failed with status {status}{}",
azure_response_context(response.headers())
)),
}
}
async fn put_object(&self, key: &str, data: Vec<u8>, content_type: &str) -> anyhow::Result<()> {
let url = self.object_url(key)?;
let response = self
.authorized_request(
self.client
.put(url)
.header("x-ms-blob-type", "BlockBlob")
.header(reqwest::header::CONTENT_TYPE, content_type)
.body(data),
)
.await?
.send()
.await?;
if response.status().is_success() {
Ok(())
} else {
let status = response.status();
let headers = azure_response_context(response.headers());
let body = response.text().await.unwrap_or_default();
let body_snippet = azure_response_body_snippet(&body);
Err(anyhow::anyhow!(
"azure blob PUT failed with status {status}{headers}{body_snippet}"
))
}
}
async fn put_object_if_absent(
&self,
key: &str,
data: Vec<u8>,
content_type: &str,
) -> anyhow::Result<bool> {
let url = self.object_url(key)?;
let response = self
.authorized_request(
self.client
.put(url)
.header("x-ms-blob-type", "BlockBlob")
.header(reqwest::header::IF_NONE_MATCH, "*")
.header(reqwest::header::CONTENT_TYPE, content_type)
.body(data),
)
.await?
.send()
.await?;
match response.status() {
status if status.is_success() => Ok(true),
StatusCode::PRECONDITION_FAILED => Ok(false),
status => Err(anyhow::anyhow!(
"azure blob conditional PUT failed with status {status}{}",
azure_response_context(response.headers())
)),
}
}
async fn put_object_file(
&self,
key: &str,
path: &Path,
content_type: &str,
) -> anyhow::Result<()> {
let url = self.object_url(key)?;
let file = tokio::fs::File::open(path)
.await
.with_context(|| format!("failed to open rollout at {}", path.display()))?;
let stream = ReaderStream::new(file);
let response = self
.authorized_request(
self.client
.put(url)
.header("x-ms-blob-type", "BlockBlob")
.header(reqwest::header::CONTENT_TYPE, content_type)
.body(reqwest::Body::wrap_stream(stream)),
)
.await?
.send()
.await?;
if response.status().is_success() {
Ok(())
} else {
let status = response.status();
let headers = azure_response_context(response.headers());
let body = response.text().await.unwrap_or_default();
let body_snippet = azure_response_body_snippet(&body);
Err(anyhow::anyhow!(
"azure blob PUT failed with status {status}{headers}{body_snippet}"
))
}
}
async fn get_object_bytes(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
let url = self.object_url(key)?;
let response = self
.authorized_request(self.client.get(url))
.await?
.send()
.await?;
match response.status() {
StatusCode::NOT_FOUND => Ok(None),
status if status.is_success() => {
let bytes = response.bytes().await?;
Ok(Some(bytes.to_vec()))
}
status => Err(anyhow::anyhow!(
"azure blob GET failed with status {status}{}",
azure_response_context(response.headers())
)),
}
}
async fn get_object_to_file(&self, key: &str, path: &Path) -> anyhow::Result<bool> {
let url = self.object_url(key)?;
let response = self
.authorized_request(self.client.get(url))
.await?
.send()
.await?;
match response.status() {
StatusCode::NOT_FOUND => Ok(false),
status if status.is_success() => {
write_response_to_file(path, response).await?;
Ok(true)
}
status => Err(anyhow::anyhow!(
"azure blob GET failed with status {status}{}",
azure_response_context(response.headers())
)),
}
}
}
fn ensure_trailing_slash(url: &mut Url) {
let path = url.path();
if path.ends_with('/') {
return;
}
let trimmed = path.trim_end_matches('/');
let new_path = if trimmed.is_empty() {
"/".to_string()
} else {
format!("{trimmed}/")
};
url.set_path(&new_path);
}
fn join_prefix(prefix: &str, key: &str) -> String {
if prefix.is_empty() {
key.to_string()
} else {
format!("{prefix}/{key}")
}
}
fn is_azure_blob_url(url: &Url) -> bool {
let Some(host) = url.host_str() else {
return false;
};
host.ends_with(".blob.core.windows.net")
}
fn azure_endpoint(url: &Url) -> anyhow::Result<Url> {
let mut endpoint = url.clone();
endpoint.set_path("/");
endpoint.set_query(None);
endpoint.set_fragment(None);
Ok(endpoint)
}
fn azure_container_and_prefix(url: &Url) -> anyhow::Result<(String, String)> {
let segments = url
.path_segments()
.map(|iter| {
iter.filter(|segment| !segment.is_empty())
.collect::<Vec<_>>()
})
.unwrap_or_default();
azure_container_and_prefix_from_segments(&segments)
}
fn azure_container_and_prefix_from_segments(segments: &[&str]) -> anyhow::Result<(String, String)> {
if segments.is_empty() {
return Err(anyhow::anyhow!(
"azure blob url must include a container name"
));
}
let container = segments[0].to_string();
let prefix = segments[1..].join("/");
Ok((container, prefix))
}
fn azure_request(builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
builder.header("x-ms-version", "2021-08-06")
}
fn azure_endpoint_for_account(account: &str) -> anyhow::Result<Url> {
let endpoint = format!("https://{account}.blob.core.windows.net/");
Url::parse(&endpoint).with_context(|| "failed to build azure blob endpoint")
}
impl AzureObjectStore {
async fn authorized_request(
&self,
builder: reqwest::RequestBuilder,
) -> anyhow::Result<reqwest::RequestBuilder> {
let builder = azure_request(builder);
let Some(credential) = &self.credential else {
return Ok(builder);
};
let token = credential
.get_token(&[AZURE_STORAGE_SCOPE])
.await
.with_context(|| "failed to acquire azure blob access token")?;
Ok(builder.bearer_auth(token.token.secret()))
}
}
fn azure_response_context(headers: &reqwest::header::HeaderMap) -> String {
let mut parts = Vec::new();
if let Some(value) = azure_header_value(headers, "x-ms-error-code") {
parts.push(format!("x-ms-error-code={value}"));
}
if let Some(value) = azure_header_value(headers, "x-ms-request-id") {
parts.push(format!("x-ms-request-id={value}"));
}
if let Some(value) = azure_header_value(headers, "www-authenticate") {
parts.push(format!("www-authenticate={value}"));
}
if parts.is_empty() {
String::new()
} else {
format!(" ({})", parts.join(", "))
}
}
fn azure_header_value(headers: &reqwest::header::HeaderMap, name: &str) -> Option<String> {
headers
.get(name)
.and_then(|value| value.to_str().ok())
.map(ToString::to_string)
}
fn azure_response_body_snippet(body: &str) -> String {
let trimmed = body.trim();
if trimmed.is_empty() {
return String::new();
}
let snippet = if trimmed.len() <= 512 {
trimmed.to_string()
} else {
let truncated: String = trimmed.chars().take(512).collect();
format!("{truncated}...")
};
format!(" (body={snippet})")
}
async fn write_response_to_file(path: &Path, response: reqwest::Response) -> anyhow::Result<()> {
let path = path.to_path_buf();
let parent = path
.parent()
.ok_or_else(|| anyhow::anyhow!("failed to resolve rollout directory"))?;
tokio::fs::create_dir_all(parent)
.await
.with_context(|| format!("failed to create rollout directory {}", parent.display()))?;
let result = async {
let mut file = tokio::fs::File::create(&path)
.await
.with_context(|| format!("failed to create rollout file {}", path.display()))?;
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk)
.await
.with_context(|| format!("failed to write rollout file {}", path.display()))?;
}
file.flush()
.await
.with_context(|| format!("failed to flush rollout file {}", path.display()))?;
Ok(())
}
.await;
if let Err(err) = result {
let _ = tokio::fs::remove_file(&path).await;
return Err(err);
}
Ok(())
}
async fn cleanup_downloaded_rollout(path: &Path, meta_path: &Path) {
let _ = tokio::fs::remove_file(path).await;
let _ = tokio::fs::remove_file(meta_path).await;
}

View File

@@ -73,6 +73,8 @@ ignore = [
{ id = "RUSTSEC-2024-0388", reason = "derivative is unmaintained; pulled in via starlark v0.13.0 used by execpolicy/cli/core; no fixed release yet" },
{ id = "RUSTSEC-2025-0057", reason = "fxhash is unmaintained; pulled in via starlark_map/starlark v0.13.0 used by execpolicy/cli/core; no fixed release yet" },
{ id = "RUSTSEC-2024-0436", reason = "paste is unmaintained; pulled in via ratatui/rmcp/starlark used by tui/execpolicy; no fixed release yet" },
{ id = "RUSTSEC-2025-0134", reason = "rustls-pemfile is unmaintained; pulled in via rama-tls-rustls used by codex-network-proxy; no safe upgrade until rama removes the dependency" },
{ id = "RUSTSEC-2024-0384", reason = "instant is unmaintained; pulled in via http-types -> futures-lite used by Azure auth; no safe upgrade available yet" },
# TODO(joshka, nornagon): remove this exception when once we update the ratatui fork to a version that uses lru 0.13+.
{ id = "RUSTSEC-2026-0002", reason = "lru 0.12.5 is pulled in via ratatui fork; cannot upgrade until the fork is updated" },
]

View File

@@ -55,6 +55,8 @@ use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::SessionSource;
use codex_core::protocol::SkillErrorInfo;
use codex_core::protocol::TokenUsage;
use codex_core::read_session_meta_line;
use codex_core::session_share::local_share_owner;
#[cfg(target_os = "windows")]
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
use codex_otel::OtelManager;
@@ -1409,6 +1411,46 @@ impl App {
AppEvent::OpenResumePicker => {
match crate::resume_picker::run_resume_picker(tui, &self.config, false).await? {
SessionSelection::Resume(path) => {
let current_owner = self
.auth_manager
.auth_cached()
.and_then(|auth| auth.get_account_email());
match local_share_owner(&path) {
Ok(Some(owner)) => {
if current_owner.as_deref() != Some(owner.as_str()) {
let session_id = read_session_meta_line(&path)
.await
.ok()
.map(|meta| meta.meta.id.to_string());
let (id_display, fork_hint) = match session_id {
Some(id) => {
(id.clone(), format!("Use `codex fork {id}` instead."))
}
None => (
"this session".to_string(),
"Use `codex fork` to select it instead.".to_string(),
),
};
self.chat_widget.add_error_message(format!(
"Cannot resume shared session {id_display} owned by {owner}. {fork_hint}"
));
return Ok(AppRunControl::Continue);
}
}
Ok(None) => {}
Err(err) => {
tracing::warn!(
error = %err,
"failed to read shared session metadata; treating session as unshared"
);
self.chat_widget.add_info_message(
"Share metadata unreadable; proceeding without owner check."
.to_string(),
None,
);
}
}
let current_cwd = self.config.cwd.clone();
let resume_cwd = match crate::resolve_cwd_for_resume_or_fork(
tui,
@@ -1416,6 +1458,7 @@ impl App {
&path,
CwdPromptAction::Resume,
true,
false,
)
.await?
{
@@ -1634,7 +1677,16 @@ impl App {
return Ok(AppRunControl::Exit(ExitReason::Fatal(message)));
}
AppEvent::CodexOp(op) => {
self.chat_widget.submit_op(op);
if let Op::UserInputAnswer { id, response } = op {
if crate::chatwidget::ChatWidget::is_share_request_id(&id) {
self.chat_widget.handle_share_response(response);
return Ok(AppRunControl::Continue);
}
self.chat_widget
.submit_op(Op::UserInputAnswer { id, response });
} else {
self.chat_widget.submit_op(op);
}
}
AppEvent::DiffResult(text) => {
// Clear the in-progress state in the bottom pane

View File

@@ -128,7 +128,11 @@ use codex_protocol::items::AgentMessageItem;
use codex_protocol::models::MessagePhase;
use codex_protocol::models::local_image_label_text;
use codex_protocol::parse_command::ParsedCommand;
use codex_protocol::request_user_input::RequestUserInputAnswer;
use codex_protocol::request_user_input::RequestUserInputEvent;
use codex_protocol::request_user_input::RequestUserInputQuestion;
use codex_protocol::request_user_input::RequestUserInputQuestionOption;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_protocol::user_input::TextElement;
use codex_protocol::user_input::UserInput;
use codex_utils_sleep_inhibitor::SleepInhibitor;
@@ -157,6 +161,11 @@ const PLAN_IMPLEMENTATION_YES: &str = "Yes, implement this plan";
const PLAN_IMPLEMENTATION_NO: &str = "No, stay in Plan mode";
const PLAN_IMPLEMENTATION_CODING_MESSAGE: &str = "Implement the plan.";
const CONNECTORS_SELECTION_VIEW_ID: &str = "connectors-selection";
const SHARE_REQUEST_PREFIX: &str = "share:";
const SHARE_SCOPE_QUESTION_ID: &str = "share_scope";
const SHARE_EMAILS_QUESTION_ID: &str = "share_emails";
const SHARE_SCOPE_EVERYONE_LABEL: &str = "Everyone using the same blob store";
const SHARE_SCOPE_EMAIL_LABEL: &str = "Specific emails";
use crate::app_event::AppEvent;
use crate::app_event::ConnectorsSnapshot;
@@ -404,6 +413,95 @@ pub(crate) fn get_limits_duration(windows_minutes: i64) -> String {
}
}
#[derive(Debug, Clone, Copy)]
enum ShareScope {
Everyone,
Emails,
}
impl ShareScope {
fn label(self) -> &'static str {
match self {
ShareScope::Everyone => SHARE_SCOPE_EVERYONE_LABEL,
ShareScope::Emails => SHARE_SCOPE_EMAIL_LABEL,
}
}
}
fn parse_share_scope(response: &RequestUserInputResponse) -> ShareScope {
response
.answers
.get(SHARE_SCOPE_QUESTION_ID)
.and_then(selected_label)
.map(|label| {
if label == SHARE_SCOPE_EMAIL_LABEL {
ShareScope::Emails
} else {
ShareScope::Everyone
}
})
.unwrap_or(ShareScope::Everyone)
}
fn parse_share_emails(response: &RequestUserInputResponse) -> Vec<String> {
let note = response
.answers
.get(SHARE_EMAILS_QUESTION_ID)
.and_then(extract_user_note);
let Some(note) = note else {
return Vec::new();
};
note.split(',')
.map(str::trim)
.filter(|entry| !entry.is_empty())
.map(ToString::to_string)
.collect()
}
fn selected_label(answer: &RequestUserInputAnswer) -> Option<String> {
answer
.answers
.iter()
.find(|entry| !entry.starts_with("user_note: "))
.cloned()
}
fn extract_user_note(answer: &RequestUserInputAnswer) -> Option<String> {
answer
.answers
.iter()
.find_map(|entry| entry.strip_prefix("user_note: ").map(ToString::to_string))
}
fn share_success_lines(
remote_id: ThreadId,
scope: ShareScope,
emails: &[String],
) -> Vec<Line<'static>> {
let mut lines: Vec<Line<'static>> = Vec::new();
lines.push(vec!["".dim(), "Session shared.".into()].into());
lines.push(vec![" ".into(), "Access: ".dim(), scope.label().into()].into());
if matches!(scope, ShareScope::Emails) {
let email_list = if emails.is_empty() {
"(none provided)".to_string()
} else {
emails.join(", ")
};
lines.push(vec![" ".into(), "Emails: ".dim(), email_list.into()].into());
}
let fork_command = format!("codex fork {remote_id}");
lines.push(vec![" ".into(), "Fork with: ".dim(), fork_command.cyan()].into());
lines.push(
vec![
" ".into(),
"Remote session id: ".dim(),
remote_id.to_string().cyan(),
]
.into(),
);
lines
}
/// Common initialization parameters shared by all `ChatWidget` constructors.
pub(crate) struct ChatWidgetInit {
pub(crate) config: Config,
@@ -3262,6 +3360,9 @@ impl ChatWidget {
SlashCommand::Fork => {
self.app_event_tx.send(AppEvent::ForkCurrentSession);
}
SlashCommand::Share => {
self.start_share_flow();
}
SlashCommand::Init => {
let init_target = self.config.cwd.join(DEFAULT_PROJECT_DOC_FILENAME);
if init_target.exists() {
@@ -3632,6 +3733,139 @@ impl ChatWidget {
self.bottom_pane.show_view(Box::new(view));
}
fn start_share_flow(&mut self) {
let Some(thread_id) = self.thread_id else {
self.add_error_message("Current session is not ready to share yet.".to_string());
return;
};
let Some(rollout_path) = self.rollout_path() else {
self.add_error_message("Current session is not ready to share yet.".to_string());
return;
};
if !rollout_path.exists() {
self.add_error_message("Current session is not ready to share yet.".to_string());
return;
}
if self.config.session_object_storage_url.is_none()
&& self.config.session_object_storage_url_cmd.is_none()
{
self.add_error_message(
"Sharing requires `session_object_storage_url` or `session_object_storage_url_cmd` in config.toml."
.to_string(),
);
return;
}
let request_id = format!("{SHARE_REQUEST_PREFIX}{thread_id}");
let questions = vec![
RequestUserInputQuestion {
id: SHARE_SCOPE_QUESTION_ID.to_string(),
header: "Share".to_string(),
question: "Who should be able to open this shared session?".to_string(),
is_secret: false,
is_other: false,
options: Some(vec![
RequestUserInputQuestionOption {
label: SHARE_SCOPE_EVERYONE_LABEL.to_string(),
description: "Anyone with access to the same object store.".to_string(),
},
RequestUserInputQuestionOption {
label: SHARE_SCOPE_EMAIL_LABEL.to_string(),
description: "Restrict to a list of emails (not enforced yet).".to_string(),
},
]),
},
RequestUserInputQuestion {
id: SHARE_EMAILS_QUESTION_ID.to_string(),
header: "Emails".to_string(),
question: "If sharing with specific emails, list them (comma-separated)."
.to_string(),
is_secret: false,
is_other: false,
options: None,
},
];
self.bottom_pane
.push_user_input_request(RequestUserInputEvent {
call_id: request_id.clone(),
turn_id: request_id,
questions,
});
self.request_redraw();
}
pub(crate) fn is_share_request_id(id: &str) -> bool {
id.starts_with(SHARE_REQUEST_PREFIX)
}
pub(crate) fn handle_share_response(&mut self, response: RequestUserInputResponse) {
let scope = parse_share_scope(&response);
let emails = parse_share_emails(&response);
// TODO: Enforce email-based access once we have a server-side auth check.
let owner = self
.auth_manager
.auth_cached()
.and_then(|auth| auth.get_account_email());
let Some(owner) = owner else {
self.add_error_message(
"Sharing requires a signed-in ChatGPT account with an email.".to_string(),
);
return;
};
let Some(thread_id) = self.thread_id else {
self.add_error_message("Current session is not ready to share yet.".to_string());
return;
};
let Some(rollout_path) = self.rollout_path() else {
self.add_error_message("Current session is not ready to share yet.".to_string());
return;
};
self.add_info_message("Sharing current session…".to_string(), None);
let config = self.config.clone();
let app_event_tx = self.app_event_tx.clone();
tokio::spawn(async move {
let storage_url = match config.resolve_session_object_storage_url().await {
Ok(Some(url)) => url,
Ok(None) => {
let cell = history_cell::new_error_event(
"Sharing requires `session_object_storage_url` or `session_object_storage_url_cmd` in config.toml."
.to_string(),
);
app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(cell)));
return;
}
Err(err) => {
let cell = history_cell::new_error_event(format!(
"Failed to resolve session object storage URL: {err}"
));
app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(cell)));
return;
}
};
let result = codex_core::session_share::upload_rollout_with_owner(
&storage_url,
thread_id,
&owner,
&rollout_path,
)
.await;
let cell = match result {
Ok(result) => {
let lines = share_success_lines(result.remote_id, scope, &emails);
PlainHistoryCell::new(lines)
}
Err(err) => {
history_cell::new_error_event(format!("Failed to share session: {err}"))
}
};
app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(cell)));
});
}
pub(crate) fn handle_paste(&mut self, text: String) {
self.bottom_pane.handle_paste(text);
}

View File

@@ -32,8 +32,11 @@ use codex_core::format_exec_policy_error_with_source;
use codex_core::path_utils;
use codex_core::protocol::AskForApproval;
use codex_core::read_session_meta_line;
use codex_core::session_share::download_rollout_if_available;
use codex_core::session_share::local_share_owner;
use codex_core::terminal::Multiplexer;
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
use codex_protocol::ThreadId;
use codex_protocol::config_types::AltScreenMode;
use codex_protocol::config_types::SandboxMode;
use codex_protocol::config_types::WindowsSandboxLevel;
@@ -49,6 +52,7 @@ use std::fs::OpenOptions;
use std::path::Path;
use std::path::PathBuf;
use tracing::error;
use tracing::warn;
use tracing_appender::non_blocking;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;
@@ -544,8 +548,8 @@ async fn run_ratatui_app(
} else {
initial_config
};
let mut missing_session_exit = |id_str: &str, action: &str| {
error!("Error finding conversation path: {id_str}");
let fatal_exit = |tui: &mut Tui, message: String| {
error!("{message}");
restore();
session_log::log_session_end();
let _ = tui.terminal.clear();
@@ -554,13 +558,20 @@ async fn run_ratatui_app(
thread_id: None,
thread_name: None,
update_action: None,
exit_reason: ExitReason::Fatal(format!(
"No saved session found with ID {id_str}. Run `codex {action}` without an ID to choose from existing sessions."
)),
exit_reason: ExitReason::Fatal(message),
})
};
let missing_session_exit = |tui: &mut Tui, id_str: &str, action: &str| {
fatal_exit(
tui,
format!(
"No saved session found with ID {id_str}. Run `codex {action}` without an ID to choose from existing sessions."
),
)
};
let use_fork = cli.fork_picker || cli.fork_last || cli.fork_session_id.is_some();
let mut remote_fork_downloaded = false;
let session_selection = if use_fork {
if let Some(id_str) = cli.fork_session_id.as_deref() {
let is_uuid = Uuid::parse_str(id_str).is_ok();
@@ -571,7 +582,40 @@ async fn run_ratatui_app(
};
match path {
Some(path) => resume_picker::SessionSelection::Fork(path),
None => return missing_session_exit(id_str, "fork"),
None => {
let storage_url = match config.resolve_session_object_storage_url().await {
Ok(Some(url)) => url,
Ok(None) => return missing_session_exit(&mut tui, id_str, "fork"),
Err(err) => {
return fatal_exit(
&mut tui,
format!("Failed to resolve session object storage URL: {err}"),
);
}
};
let Ok(session_id) = ThreadId::from_string(id_str) else {
return missing_session_exit(&mut tui, id_str, "fork");
};
match download_rollout_if_available(
&storage_url,
session_id,
&config.codex_home,
)
.await
{
Ok(Some(path)) => {
remote_fork_downloaded = true;
resume_picker::SessionSelection::Fork(path)
}
Ok(None) => return missing_session_exit(&mut tui, id_str, "fork"),
Err(err) => {
return fatal_exit(
&mut tui,
format!("Failed to fetch remote session {id_str}: {err}"),
);
}
}
}
}
} else if cli.fork_last {
let provider_filter = vec![config.model_provider_id.clone()];
@@ -620,7 +664,7 @@ async fn run_ratatui_app(
};
match path {
Some(path) => resume_picker::SessionSelection::Resume(path),
None => return missing_session_exit(id_str, "resume"),
None => return missing_session_exit(&mut tui, id_str, "resume"),
}
} else if cli.resume_last {
let provider_filter = vec![config.model_provider_id.clone()];
@@ -663,6 +707,43 @@ async fn run_ratatui_app(
resume_picker::SessionSelection::StartFresh
};
let current_owner = auth_manager
.auth_cached()
.and_then(|auth| auth.get_account_email());
if let resume_picker::SessionSelection::Resume(path) = &session_selection {
match local_share_owner(path) {
Ok(Some(owner)) => {
if current_owner.as_deref() != Some(owner.as_str()) {
let session_id = read_session_meta_line(path)
.await
.ok()
.map(|meta| meta.meta.id.to_string());
let (id_display, fork_hint) = match session_id {
Some(id) => (id.clone(), format!("Use `codex fork {id}` instead.")),
None => (
"this session".to_string(),
"Use `codex fork` to select it instead.".to_string(),
),
};
return fatal_exit(
&mut tui,
format!(
"Cannot resume shared session {id_display} owned by {owner}. {fork_hint}"
),
);
}
}
Ok(None) => {}
Err(err) => {
warn!(
error = %err,
"failed to read shared session metadata; treating session as unshared"
);
}
}
}
let current_cwd = config.cwd.clone();
let allow_prompt = cli.cwd.is_none();
let action_and_path_if_resume_or_fork = match &session_selection {
@@ -672,8 +753,25 @@ async fn run_ratatui_app(
};
let fallback_cwd = match action_and_path_if_resume_or_fork {
Some((action, path)) => {
resolve_cwd_for_resume_or_fork(&mut tui, &current_cwd, path, action, allow_prompt)
.await?
let shared_owner = match action {
CwdPromptAction::Fork => local_share_owner(path).ok().flatten(),
CwdPromptAction::Resume => None,
};
let shared_by_other = shared_owner
.as_deref()
.map(|owner| current_owner.as_deref() != Some(owner))
.unwrap_or(false);
let prefer_current_cwd =
(remote_fork_downloaded || shared_by_other) && action == CwdPromptAction::Fork;
resolve_cwd_for_resume_or_fork(
&mut tui,
&current_cwd,
path,
action,
allow_prompt,
prefer_current_cwd,
)
.await?
}
None => None,
};
@@ -786,10 +884,17 @@ pub(crate) async fn resolve_cwd_for_resume_or_fork(
path: &Path,
action: CwdPromptAction,
allow_prompt: bool,
prefer_current_cwd: bool,
) -> color_eyre::Result<Option<PathBuf>> {
if prefer_current_cwd {
return Ok(Some(current_cwd.to_path_buf()));
}
let Some(history_cwd) = read_session_cwd(path).await else {
return Ok(None);
};
if action == CwdPromptAction::Fork && !history_cwd.exists() {
return Ok(Some(current_cwd.to_path_buf()));
}
if allow_prompt && cwds_differ(current_cwd, &history_cwd) {
let selection =
cwd_prompt::run_cwd_selection_prompt(tui, action, current_cwd, &history_cwd).await?;

View File

@@ -26,6 +26,7 @@ pub enum SlashCommand {
New,
Resume,
Fork,
Share,
Init,
Compact,
Plan,
@@ -67,6 +68,7 @@ impl SlashCommand {
SlashCommand::Rename => "rename the current thread",
SlashCommand::Resume => "resume a saved chat",
SlashCommand::Fork => "fork the current chat",
SlashCommand::Share => "share the current chat via object storage",
// SlashCommand::Undo => "ask Codex to undo a turn",
SlashCommand::Quit | SlashCommand::Exit => "exit Codex",
SlashCommand::Diff => "show git diff (including untracked files)",
@@ -122,6 +124,7 @@ impl SlashCommand {
SlashCommand::New
| SlashCommand::Resume
| SlashCommand::Fork
| SlashCommand::Share
| SlashCommand::Init
| SlashCommand::Compact
// | SlashCommand::Undo

View File

@@ -8,6 +8,6 @@ license.workspace = true
workspace = true
[dependencies]
assert_cmd = { workspace = true }
path-absolutize = { workspace = true }
runfiles = { workspace = true }
thiserror = { workspace = true }

View File

@@ -1,3 +1,6 @@
use path_absolutize::Absolutize;
use std::env;
use std::ffi;
use std::ffi::OsString;
use std::io;
use std::path::Path;
@@ -43,27 +46,13 @@ pub fn cargo_bin(name: &str) -> Result<PathBuf, CargoBinError> {
return resolve_bin_from_env(key, value);
}
}
match assert_cmd::Command::cargo_bin(name) {
Ok(cmd) => {
let mut path = PathBuf::from(cmd.get_program());
if !path.is_absolute() {
path = std::env::current_dir()
.map_err(|source| CargoBinError::CurrentDir { source })?
.join(path);
}
if path.exists() {
Ok(path)
} else {
Err(CargoBinError::ResolvedPathDoesNotExist {
key: "assert_cmd::Command::cargo_bin".to_owned(),
path,
})
}
}
match resolve_bin_from_target_dir(name) {
Ok(path) => Ok(path),
Err(err) => Err(CargoBinError::NotFound {
name: name.to_owned(),
env_keys,
fallback: format!("assert_cmd fallback failed: {err}"),
fallback: format!("target dir fallback failed: {err}"),
}),
}
}
@@ -106,6 +95,41 @@ fn resolve_bin_from_env(key: &str, value: OsString) -> Result<PathBuf, CargoBinE
})
}
fn resolve_bin_from_target_dir(name: &str) -> Result<PathBuf, CargoBinError> {
let target_dir = cargo_target_dir()?;
let filename = format!("{name}{}", env::consts::EXE_SUFFIX);
let candidate = target_dir.join(filename);
let abs = absolutize_from_buck_or_cwd(candidate.clone())?;
if abs.exists() {
Ok(abs)
} else {
Err(CargoBinError::ResolvedPathDoesNotExist {
key: "target_dir".to_owned(),
path: candidate,
})
}
}
fn cargo_target_dir() -> Result<PathBuf, CargoBinError> {
let current_exe = env::current_exe().map_err(|source| CargoBinError::CurrentExe { source })?;
let mut path = current_exe.parent().map(PathBuf::from).ok_or_else(|| {
CargoBinError::ResolvedPathDoesNotExist {
key: "current_exe".to_owned(),
path: current_exe.clone(),
}
})?;
if path.ends_with(ffi::OsStr::new("deps")) {
path.pop();
}
Ok(path)
}
fn absolutize_from_buck_or_cwd(path: PathBuf) -> Result<PathBuf, CargoBinError> {
path.absolutize()
.map(std::borrow::Cow::into_owned)
.map_err(|source| CargoBinError::CurrentDir { source })
}
/// Macro that derives the path to a test resource at runtime, the value of
/// which depends on whether Cargo or Bazel is being used to build and run a
/// test. Note the return value may be a relative or absolute path.

View File

@@ -24,6 +24,40 @@ Codex can run a notification hook when the agent finishes a turn. See the config
- https://developers.openai.com/codex/config-reference
## Session sharing (enterprise)
To enable `/share` for enterprise or self-hosted storage, configure:
```
session_object_storage_url = "https://your-object-store.example.com/codex-sessions/"
```
If you need a dynamic URL (for example, a short-lived token), you can provide a command
that prints the URL to stdout:
```
session_object_storage_url_cmd = ["bash", "-lc", "get-session-store-url"]
```
If both `session_object_storage_url` and `session_object_storage_url_cmd` are set, the
static URL wins.
You can also use Azure Blob Storage with a SAS URL, either as a standard HTTPS URL or the shorthand `az://` form:
```
session_object_storage_url = "https://<account>.blob.core.windows.net/<container>/codex-sessions?<sas>"
```
```
session_object_storage_url = "az://<account>/<container>/codex-sessions?<sas>"
```
For Azure, the SAS token must allow read and write access to blob objects under the prefix. Listing is not required.
If you omit the SAS token, Codex will try Azure CLI authentication (`az login`) and request storage scope tokens for the blob API.
For HTTP/HTTPS URLs, the endpoint should support `HEAD`/`PUT` for individual objects. Codex will upload the session rollout (`.jsonl`) under this prefix when sharing.
## JSON Schema
The generated JSON Schema for `config.toml` lives at `codex-rs/core/config.schema.json`.