diff --git a/CHANGELOG.md b/CHANGELOG.md index 013d35e3cd8..63c8dbaa570 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - Unset segment info for web vital spans. ([#6042](https://github.com/getsentry/relay/pull/6042)) - Set sentry.trace.status on segment spans. ([#6140](https://github.com/getsentry/relay/pull/6140)) - Don't modify segment information for V2 web vital spans. ([#6160](https://github.com/getsentry/relay/pull/6160)) +- Support compressed minidumps when the `relay-minidump-uploads` feature is enabled. ([#6151](https://github.com/getsentry/relay/pull/6151)) **Internal**: diff --git a/Cargo.lock b/Cargo.lock index 20cfe9e18e7..7868ad66b8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -729,8 +729,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce2548391e9c1929c21bf6aa2680af86fe4c1b33e6cea9ac1cfeec0bd11218cf" dependencies = [ "brotli 8.0.2", + "bzip2", "compression-core", "flate2", + "liblzma", "memchr", "zstd", "zstd-safe", diff --git a/Cargo.toml b/Cargo.toml index b337d207034..2237d748b54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,7 @@ android_trace_log = { version = "0.3", features = ["serde"] } # Keep it pinned until it's possible to disable backtrace. anyhow = "=1.0.69" arc-swap = "1" -async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd"] } +async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd", "bzip2", "xz"] } async-trait = "0.1" axum = "0.8" axum-extra = "0.12" diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index a1c0b583284..54217d4861d 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -5,7 +5,7 @@ use axum::routing::{MethodRouter, post}; use bytes::Bytes; use bzip2::read::BzDecoder; use flate2::read::GzDecoder; -use futures::{self, Stream}; +use futures::{self, Stream, StreamExt, TryStreamExt}; use liblzma::read::XzDecoder; use multer::{Field, Multipart}; use relay_config::Config; @@ -18,6 +18,8 @@ use std::convert::Infallible; use std::error::Error; use std::io::Cursor; use std::io::Read; +use tokio::io::BufReader; +use tokio_util::io::{ReaderStream, StreamReader}; use tower_http::limit::RequestBodyLimitLayer; use zstd::stream::Decoder as ZstdDecoder; @@ -30,7 +32,7 @@ use crate::middlewares; use crate::service::ServiceState; use crate::services::outcome::{DiscardAttachmentType, DiscardItemType, DiscardReason, Outcome}; use crate::services::projects::project::ProjectState; -use crate::services::upload::{ProjectContext, Upload}; +use crate::services::upload::{ByteStream, ProjectContext, Upload}; use crate::statsd::RelayCounters; use crate::utils::{self, AttachmentStrategy, read_bytes_into_item}; @@ -69,34 +71,31 @@ const MAGIC_PEEK: usize = 6; /// Content types by which standalone uploads can be recognized. const MINIDUMP_RAW_CONTENT_TYPES: &[&str] = &["application/octet-stream", "application/x-dmp"]; -#[derive(Debug, thiserror::Error)] -enum PeekError { - #[error("compressed minidump payloads are not supported for streaming upload")] - Compressed, - #[error(transparent)] - Source(#[from] E), +macro_rules! wrap_decode { + ($stream:expr, $decoder:ident) => {{ ReaderStream::new($decoder::new(BufReader::new(StreamReader::new($stream)))).boxed() }}; } -/// Peek the first bytes of `stream` and reject if they look compressed (gzip/xz/bzip2/zstd). -/// Returns the original stream contents if not. -async fn reject_if_compressed( - stream: S, -) -> Result> + Send, PeekError> +/// Peek the first bytes of `stream` and returns a decoding wrapper if necessary. +/// +/// Returns raw minidump bytes if the stream is uncompressed, otherwise decompresses +/// one of the minidump container formats we support for inline uploads. +async fn decode_stream(stream: S) -> std::io::Result where - S: Stream> + Send, - E: Send, + S: Stream> + Send + 'static, + E: Into> + Send + 'static, { - let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; + use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdDecoder}; - if head.starts_with(GZIP_MAGIC_HEADER) - || head.starts_with(XZ_MAGIC_HEADER) - || head.starts_with(BZIP2_MAGIC_HEADER) - || head.starts_with(ZSTD_MAGIC_HEADER) - { - Err(PeekError::Compressed) - } else { - Ok(stream) - } + let stream = stream.map_err(std::io::Error::other); + let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; + let decoded = match Compression::from(&head) { + Compression::None => stream.boxed(), + Compression::Zstd => wrap_decode!(stream, ZstdDecoder), + Compression::Gzip => wrap_decode!(stream, GzipDecoder), + Compression::Xz => wrap_decode!(stream, XzDecoder), + Compression::Bzip2 => wrap_decode!(stream, BzDecoder), + }; + Ok(decoded) } fn validate_minidump(data: &[u8]) -> Result<(), BadStoreRequest> { @@ -118,25 +117,46 @@ fn run_decoder(mut decoder: impl Read) -> std::io::Result> { Ok(buffer) } -/// Creates a decoder based on the magic bytes the minidump payload +/// Types of compression we support for minidump payloads. +enum Compression { + None, + Gzip, + Xz, + Bzip2, + Zstd, +} + +impl Compression { + fn from(header: &[u8]) -> Self { + if header.starts_with(GZIP_MAGIC_HEADER) { + Self::Gzip + } else if header.starts_with(XZ_MAGIC_HEADER) { + Self::Xz + } else if header.starts_with(BZIP2_MAGIC_HEADER) { + Self::Bzip2 + } else if header.starts_with(ZSTD_MAGIC_HEADER) { + Self::Zstd + } else { + Self::None + } + } +} + +/// Creates a decoder based on the magic bytes in the minidump payload. fn decoder_from(minidump_data: Bytes) -> Option> { - if minidump_data.starts_with(GZIP_MAGIC_HEADER) { - return Some(Box::new(GzDecoder::new(Cursor::new(minidump_data)))); - } else if minidump_data.starts_with(XZ_MAGIC_HEADER) { - return Some(Box::new(XzDecoder::new(Cursor::new(minidump_data)))); - } else if minidump_data.starts_with(BZIP2_MAGIC_HEADER) { - return Some(Box::new(BzDecoder::new(Cursor::new(minidump_data)))); - } else if minidump_data.starts_with(ZSTD_MAGIC_HEADER) { - return match ZstdDecoder::new(Cursor::new(minidump_data)) { + match Compression::from(&minidump_data) { + Compression::None => None, + Compression::Gzip => Some(Box::new(GzDecoder::new(Cursor::new(minidump_data)))), + Compression::Xz => Some(Box::new(XzDecoder::new(Cursor::new(minidump_data)))), + Compression::Bzip2 => Some(Box::new(BzDecoder::new(Cursor::new(minidump_data)))), + Compression::Zstd => match ZstdDecoder::new(Cursor::new(minidump_data)) { Ok(decoder) => Some(Box::new(decoder)), Err(ref err) => { relay_log::error!(error = err as &dyn Error, "failed to create ZstdDecoder"); None } - }; + }, } - - None } /// Tries to decode a minidump using any of the supported compression formats @@ -315,11 +335,11 @@ impl<'a> AttachmentStrategy for MinidumpAttachmentStrategy<'a> { } } -/// Wrapper around [`upload_stream`] that enforces that minidumps are not compressed. +/// Wrapper around [`upload_to_objectstore`] that decompresses minidumps if necessary. pub async fn upload_stream_checked( stream: S, content_type: Option, - item: Managed, + mut item: Managed, config: &Config, project: ProjectContext, upload: &Addr, @@ -343,17 +363,26 @@ where .map_err(|_| BadStoreRequest::UploadFailed); } - let stream = match reject_if_compressed(stream).await { - Ok(stream) => stream, + let stream = match decode_stream(stream).await { + Ok(decoded) => decoded, Err(_) => { let _ = item.reject_err(Outcome::Invalid(DiscardReason::InvalidMinidump)); return Err(BadStoreRequest::InvalidMinidump); } }; + item.modify(|item, _| { + if let Some(filename) = item.filename() { + let new_filename = remove_container_extension(filename); + if new_filename != filename { + let new_filename = new_filename.to_owned(); + item.set_filename(new_filename); + } + } + }); upload_stream( stream, - content_type, + Some(ContentType::Minidump.to_string()), item, config, project, @@ -496,7 +525,6 @@ async fn raw_minidump_to_item( request: Request, meta: &RequestMeta, state: &ServiceState, - content_type: RawContentType, upload_context: Option>, ) -> Result, BadStoreRequest> { debug_assert!(!matches!( @@ -511,13 +539,13 @@ async fn raw_minidump_to_item( if let Some(upload_context) = upload_context && matches!(upload_context.upload_minidumps, UploadDecision::Upload) { - let stream = reject_if_compressed(request.into_body().into_data_stream()) + let stream = decode_stream(request.into_body().into_data_stream()) .await .map_err(|_| BadStoreRequest::InvalidMinidump)?; item = upload_stream( stream, - Some(content_type.to_string()).filter(|s| !s.is_empty()), + Some(ContentType::Minidump.to_string()), item, state.config(), upload_context.project, @@ -548,7 +576,7 @@ async fn items( request: Request, ) -> Result, BadStoreRequest> { let items = if MINIDUMP_RAW_CONTENT_TYPES.contains(&content_type.as_ref()) { - raw_minidump_to_item(request, meta, state, content_type, upload_context) + raw_minidump_to_item(request, meta, state, upload_context) .await? .map(|item, _| smallvec![item]) } else { diff --git a/tests/integration/test_minidump.py b/tests/integration/test_minidump.py index c45108d4079..2f28b3b504d 100644 --- a/tests/integration/test_minidump.py +++ b/tests/integration/test_minidump.py @@ -491,17 +491,20 @@ def test_minidump_invalid_nested_formdata(mini_sentry, relay): @pytest.mark.parametrize( - "rate_limit,minidump_filename,use_objectstore", + "rate_limit,minidump_filename,use_objectstore,stream_upload", [ - (None, "minidump.dmp", True), - (None, "minidump.dmp", False), - ("attachment", "minidump.dmp", True), - ("attachment", "minidump.dmp", False), - ("transaction", "minidump.dmp", False), - (None, "minidump.dmp.gz", False), - (None, "minidump.dmp.xz", False), - (None, "minidump.dmp.bz2", False), - (None, "minidump.dmp.zst", False), + (None, "minidump.dmp", True, False), + (None, "minidump.dmp", False, False), + ("attachment", "minidump.dmp", True, False), + ("attachment", "minidump.dmp", False, False), + ("transaction", "minidump.dmp", False, False), + (None, "minidump.dmp.gz", False, False), + (None, "minidump.dmp.xz", False, False), + (None, "minidump.dmp.bz2", False, False), + (None, "minidump.dmp.bz2", True, False), + (None, "minidump.dmp.zst", False, False), + (None, "minidump.dmp.zst", True, False), + (None, "minidump.dmp.zst", True, True), ], ) def test_minidump_with_processing( @@ -513,6 +516,7 @@ def test_minidump_with_processing( minidump_filename, use_objectstore, objectstore, + stream_upload, ): dmp_path = os.path.join(os.path.dirname(__file__), "fixtures/native/minidump.dmp") with open(dmp_path, "rb") as f: @@ -533,6 +537,10 @@ def test_minidump_with_processing( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) project_config["config"]["eventRetention"] = 50000 + if stream_upload: + project_config["config"].setdefault("features", []).append( + "projects:relay-minidump-uploads" + ) options = { "processing": { @@ -595,10 +603,15 @@ def test_minidump_with_processing( assert event["exception"]["values"][0]["mechanism"]["type"] == "minidump" # Check information extracted from the minidump - assert event["timestamp"] == 1574692481.0 # 11/25/2019 @ 2:34pm (UTC) + if not stream_upload: + assert event["timestamp"] == 1574692481.0 # 11/25/2019 @ 2:34pm (UTC) # Check that the SDK name is correctly detected - assert event["sdk"]["name"] == "minidump.unknown" + assert ( + event["sdk"]["name"] == "minidump.upload" + if stream_upload + else "minidump.unknown" + ) if not use_objectstore: assert list(message["attachments"]) == [ @@ -1134,6 +1147,7 @@ def test_minidump_objectstore_uploads( ) assert json.loads(minidump.payload.bytes) == { "location": DUMMY_UPLOAD_LOCATION, + "content_type": "application/x-dmp", } else: assert ( @@ -1620,72 +1634,6 @@ def test_minidump_large_attachment_skipped_when_no_project_fetching(mini_sentry, assert envelope.items[0].payload.bytes == minidump_content -@pytest.mark.parametrize( - "magic,filename", - [ - pytest.param(b"\x1f\x8b", "minidump.dmp.gz", id="gzip"), - pytest.param(b"\xfd7zXZ\x00", "minidump.dmp.xz", id="xz"), - pytest.param(b"BZh", "minidump.dmp.bz2", id="bzip2"), - pytest.param(b"\x28\xb5\x2f\xfd", "minidump.dmp.zst", id="zstd"), - ], -) -def test_minidump_objectstore_uploads_rejects_compressed( - mini_sentry, - relay, - magic, - filename, -): - """ - When streaming a minidump to objectstore, a compressed payload should be reject - (untill objectstore or minidump can handle them). - """ - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"].setdefault("features", []).append( - "projects:relay-minidump-uploads" - ) - - relay = relay( - mini_sentry, - options={ - "outcomes": { - "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, - }, - }, - ) - - with pytest.raises(HTTPError) as exc_info: - relay.send_minidump( - project_id=project_id, - files=[(MINIDUMP_ATTACHMENT_NAME, filename, magic + b"\x00" * 32)], - ) - - assert exc_info.value.response.status_code == 400 - - assert mini_sentry.get_aggregated_outcomes() == [ - { - "category": 1, - "outcome": 3, - "quantity": 1, - "reason": "invalid_minidump", - }, - { - "category": 4, - "outcome": 3, - "reason": "invalid_minidump", - "quantity": 1, - }, - { - "category": 22, - "outcome": 3, - "reason": "invalid_minidump", - "quantity": 1, - }, - ] - - def test_minidump_upload_failure_bubbles_up(mini_sentry, relay): project_id = 42 minidump_content = b"MDMP content" @@ -1741,6 +1689,28 @@ def create(**opts): ] +def test_faulty_compression_stream(mini_sentry, relay, dummy_upload): + project_id = 42 + minidump_content = b"\x1f\x8b" + b"not a valid gzip stream" + + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).append( + "projects:relay-minidump-uploads" + ) + + relay = relay(mini_sentry) + + response = relay.send_minidump( + project_id=project_id, + files=[(MINIDUMP_ATTACHMENT_NAME, "minidump.dmp.gz", minidump_content)], + raise_for_status=False, + ) + + assert response.status_code == 400 + assert response.json()["detail"] == "invalid compression" + assert mini_sentry.captured_envelopes.empty() + + def test_minidump_proxy_mode(mini_sentry, relay): project_id = 42 mini_sentry.add_full_project_config(project_id)