Skip to content
Draft
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- Unset segment info for web vital spans. ([#6042](https://github.com/getsentry/relay/pull/6042))
- Set sentry.trace.status on segment spans. ([#6140](https://github.com/getsentry/relay/pull/6140))
- Don't modify segment information for V2 web vital spans. ([#6160](https://github.com/getsentry/relay/pull/6160))
- Support compressed minidumps when the `relay-minidump-uploads` feature is enabled. ([#6151](https://github.com/getsentry/relay/pull/6151))

**Internal**:

Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -729,8 +729,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce2548391e9c1929c21bf6aa2680af86fe4c1b33e6cea9ac1cfeec0bd11218cf"
dependencies = [
"brotli 8.0.2",
"bzip2",
"compression-core",
"flate2",
"liblzma",
"memchr",
"zstd",
"zstd-safe",
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ android_trace_log = { version = "0.3", features = ["serde"] }
# Keep it pinned until it's possible to disable backtrace.
anyhow = "=1.0.69"
arc-swap = "1"
async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd"] }
async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd", "bzip2", "xz"] }
async-trait = "0.1"
axum = "0.8"
axum-extra = "0.12"
Expand Down
120 changes: 74 additions & 46 deletions relay-server/src/endpoints/minidump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use axum::routing::{MethodRouter, post};
use bytes::Bytes;
use bzip2::read::BzDecoder;
use flate2::read::GzDecoder;
use futures::{self, Stream};
use futures::{self, Stream, StreamExt, TryStreamExt};
use liblzma::read::XzDecoder;
use multer::{Field, Multipart};
use relay_config::Config;
Expand All @@ -18,6 +18,8 @@ use std::convert::Infallible;
use std::error::Error;
use std::io::Cursor;
use std::io::Read;
use tokio::io::BufReader;
use tokio_util::io::{ReaderStream, StreamReader};
use tower_http::limit::RequestBodyLimitLayer;
use zstd::stream::Decoder as ZstdDecoder;

Expand All @@ -30,7 +32,7 @@ use crate::middlewares;
use crate::service::ServiceState;
use crate::services::outcome::{DiscardAttachmentType, DiscardItemType, DiscardReason, Outcome};
use crate::services::projects::project::ProjectState;
use crate::services::upload::{ProjectContext, Upload};
use crate::services::upload::{ByteStream, ProjectContext, Upload};
use crate::statsd::RelayCounters;
use crate::utils::{self, AttachmentStrategy, read_bytes_into_item};

Expand Down Expand Up @@ -69,34 +71,31 @@ const MAGIC_PEEK: usize = 6;
/// Content types by which standalone uploads can be recognized.
const MINIDUMP_RAW_CONTENT_TYPES: &[&str] = &["application/octet-stream", "application/x-dmp"];

#[derive(Debug, thiserror::Error)]
enum PeekError<E> {
#[error("compressed minidump payloads are not supported for streaming upload")]
Compressed,
#[error(transparent)]
Source(#[from] E),
macro_rules! wrap_decode {
($stream:expr, $decoder:ident) => {{ ReaderStream::new($decoder::new(BufReader::new(StreamReader::new($stream)))).boxed() }};
}

/// Peek the first bytes of `stream` and reject if they look compressed (gzip/xz/bzip2/zstd).
/// Returns the original stream contents if not.
async fn reject_if_compressed<S, E>(
stream: S,
) -> Result<impl Stream<Item = Result<Bytes, E>> + Send, PeekError<E>>
/// Peek the first bytes of `stream` and returns a decoding wrapper if necessary.
///
/// Returns raw minidump bytes if the stream is uncompressed, otherwise decompresses
/// one of the minidump container formats we support for inline uploads.
async fn decode_stream<S, E>(stream: S) -> std::io::Result<ByteStream>
where
S: Stream<Item = Result<Bytes, E>> + Send,
E: Send,
S: Stream<Item = Result<Bytes, E>> + Send + 'static,
E: Into<Box<dyn Error + Send + Sync>> + Send + 'static,
{
let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?;
use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdDecoder};

if head.starts_with(GZIP_MAGIC_HEADER)
|| head.starts_with(XZ_MAGIC_HEADER)
|| head.starts_with(BZIP2_MAGIC_HEADER)
|| head.starts_with(ZSTD_MAGIC_HEADER)
{
Err(PeekError::Compressed)
} else {
Ok(stream)
}
let stream = stream.map_err(std::io::Error::other);
let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?;
let decoded = match Compression::from(&head) {
Compression::None => stream.boxed(),
Compression::Zstd => wrap_decode!(stream, ZstdDecoder),
Compression::Gzip => wrap_decode!(stream, GzipDecoder),
Compression::Xz => wrap_decode!(stream, XzDecoder),
Compression::Bzip2 => wrap_decode!(stream, BzDecoder),
};
Ok(decoded)
}

fn validate_minidump(data: &[u8]) -> Result<(), BadStoreRequest> {
Expand All @@ -118,25 +117,46 @@ fn run_decoder(mut decoder: impl Read) -> std::io::Result<Vec<u8>> {
Ok(buffer)
}

/// Creates a decoder based on the magic bytes the minidump payload
/// Types of compression we support for minidump payloads.
enum Compression {
None,
Gzip,
Xz,
Bzip2,
Zstd,
}

impl Compression {
fn from(header: &[u8]) -> Self {
if header.starts_with(GZIP_MAGIC_HEADER) {
Self::Gzip
} else if header.starts_with(XZ_MAGIC_HEADER) {
Self::Xz
} else if header.starts_with(BZIP2_MAGIC_HEADER) {
Self::Bzip2
} else if header.starts_with(ZSTD_MAGIC_HEADER) {
Self::Zstd
} else {
Self::None
}
}
}

/// Creates a decoder based on the magic bytes in the minidump payload.
fn decoder_from(minidump_data: Bytes) -> Option<Box<dyn Read>> {
if minidump_data.starts_with(GZIP_MAGIC_HEADER) {
return Some(Box::new(GzDecoder::new(Cursor::new(minidump_data))));
} else if minidump_data.starts_with(XZ_MAGIC_HEADER) {
return Some(Box::new(XzDecoder::new(Cursor::new(minidump_data))));
} else if minidump_data.starts_with(BZIP2_MAGIC_HEADER) {
return Some(Box::new(BzDecoder::new(Cursor::new(minidump_data))));
} else if minidump_data.starts_with(ZSTD_MAGIC_HEADER) {
return match ZstdDecoder::new(Cursor::new(minidump_data)) {
match Compression::from(&minidump_data) {
Compression::None => None,
Compression::Gzip => Some(Box::new(GzDecoder::new(Cursor::new(minidump_data)))),
Compression::Xz => Some(Box::new(XzDecoder::new(Cursor::new(minidump_data)))),
Compression::Bzip2 => Some(Box::new(BzDecoder::new(Cursor::new(minidump_data)))),
Compression::Zstd => match ZstdDecoder::new(Cursor::new(minidump_data)) {
Ok(decoder) => Some(Box::new(decoder)),
Err(ref err) => {
relay_log::error!(error = err as &dyn Error, "failed to create ZstdDecoder");
None
}
};
},
}

None
}

/// Tries to decode a minidump using any of the supported compression formats
Expand Down Expand Up @@ -315,11 +335,11 @@ impl<'a> AttachmentStrategy for MinidumpAttachmentStrategy<'a> {
}
}

/// Wrapper around [`upload_stream`] that enforces that minidumps are not compressed.
/// Wrapper around [`upload_to_objectstore`] that decompresses minidumps if necessary.
pub async fn upload_stream_checked<S, E>(
stream: S,
content_type: Option<String>,
item: Managed<Item>,
mut item: Managed<Item>,
config: &Config,
project: ProjectContext,
upload: &Addr<Upload>,
Expand All @@ -343,17 +363,26 @@ where
.map_err(|_| BadStoreRequest::UploadFailed);
}

let stream = match reject_if_compressed(stream).await {
Ok(stream) => stream,
let stream = match decode_stream(stream).await {
Ok(decoded) => decoded,
Err(_) => {
let _ = item.reject_err(Outcome::Invalid(DiscardReason::InvalidMinidump));
return Err(BadStoreRequest::InvalidMinidump);
}
};

item.modify(|item, _| {
if let Some(filename) = item.filename() {
let new_filename = remove_container_extension(filename);
if new_filename != filename {
let new_filename = new_filename.to_owned();
item.set_filename(new_filename);
}
}
});
upload_stream(
stream,
content_type,
Some(ContentType::Minidump.to_string()),
item,
config,
project,
Expand Down Expand Up @@ -496,7 +525,6 @@ async fn raw_minidump_to_item(
request: Request,
meta: &RequestMeta,
state: &ServiceState,
content_type: RawContentType,
upload_context: Option<UploadContext<'_>>,
) -> Result<Managed<Item>, BadStoreRequest> {
debug_assert!(!matches!(
Expand All @@ -511,13 +539,13 @@ async fn raw_minidump_to_item(
if let Some(upload_context) = upload_context
&& matches!(upload_context.upload_minidumps, UploadDecision::Upload)
{
let stream = reject_if_compressed(request.into_body().into_data_stream())
let stream = decode_stream(request.into_body().into_data_stream())
.await
.map_err(|_| BadStoreRequest::InvalidMinidump)?;

item = upload_stream(
stream,
Some(content_type.to_string()).filter(|s| !s.is_empty()),
Some(ContentType::Minidump.to_string()),
item,
state.config(),
upload_context.project,
Expand Down Expand Up @@ -548,7 +576,7 @@ async fn items(
request: Request,
) -> Result<Managed<Items>, BadStoreRequest> {
let items = if MINIDUMP_RAW_CONTENT_TYPES.contains(&content_type.as_ref()) {
raw_minidump_to_item(request, meta, state, content_type, upload_context)
raw_minidump_to_item(request, meta, state, upload_context)
.await?
.map(|item, _| smallvec![item])
} else {
Expand Down
Loading
Loading