From d466139045ae3b420c9613ee6c3c6b8d527fe2a7 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Mon, 29 Jun 2026 20:33:52 +0200 Subject: [PATCH 01/13] ref: compression enum --- relay-server/src/endpoints/minidump.rs | 58 +++++++++++++++++--------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index f8bc8c290cc..9ad15e8df9f 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -88,14 +88,11 @@ where { let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; - if head.starts_with(GZIP_MAGIC_HEADER) - || head.starts_with(XZ_MAGIC_HEADER) - || head.starts_with(BZIP2_MAGIC_HEADER) - || head.starts_with(ZSTD_MAGIC_HEADER) - { - Err(PeekError::Compressed) - } else { - Ok(stream) + match Compression::from(&head) { + Compression::NoCompression => Ok(stream), + Compression::Gzip | Compression::Xz | Compression::Bzip2 | Compression::Zstd => { + Err(PeekError::Compressed) + } } } @@ -118,25 +115,46 @@ fn run_decoder(mut decoder: impl Read) -> std::io::Result> { Ok(buffer) } -/// Creates a decoder based on the magic bytes the minidump payload +/// Types of compression we support for minidump payloads. +enum Compression { + NoCompression, + Gzip, + Xz, + Bzip2, + Zstd, +} + +impl Compression { + fn from(header: &[u8]) -> Self { + if header.starts_with(GZIP_MAGIC_HEADER) { + Self::Gzip + } else if header.starts_with(XZ_MAGIC_HEADER) { + Self::Xz + } else if header.starts_with(BZIP2_MAGIC_HEADER) { + Self::Bzip2 + } else if header.starts_with(ZSTD_MAGIC_HEADER) { + Self::Zstd + } else { + Self::NoCompression + } + } +} + +/// Creates a decoder based on the magic bytes in the minidump payload. fn decoder_from(minidump_data: Bytes) -> Option> { - if minidump_data.starts_with(GZIP_MAGIC_HEADER) { - return Some(Box::new(GzDecoder::new(Cursor::new(minidump_data)))); - } else if minidump_data.starts_with(XZ_MAGIC_HEADER) { - return Some(Box::new(XzDecoder::new(Cursor::new(minidump_data)))); - } else if minidump_data.starts_with(BZIP2_MAGIC_HEADER) { - return Some(Box::new(BzDecoder::new(Cursor::new(minidump_data)))); - } else if minidump_data.starts_with(ZSTD_MAGIC_HEADER) { - return match ZstdDecoder::new(Cursor::new(minidump_data)) { + match Compression::from(&minidump_data) { + Compression::NoCompression => None, + Compression::Gzip => Some(Box::new(GzDecoder::new(Cursor::new(minidump_data)))), + Compression::Xz => Some(Box::new(XzDecoder::new(Cursor::new(minidump_data)))), + Compression::Bzip2 => Some(Box::new(BzDecoder::new(Cursor::new(minidump_data)))), + Compression::Zstd => match ZstdDecoder::new(Cursor::new(minidump_data)) { Ok(decoder) => Some(Box::new(decoder)), Err(ref err) => { relay_log::error!(error = err as &dyn Error, "failed to create ZstdDecoder"); None } - }; + }, } - - None } /// Tries to decode a minidump using any of the supported compression formats From ec1d33950d66e7255cab2110689145eeb1980287 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 30 Jun 2026 11:00:37 +0200 Subject: [PATCH 02/13] wip --- Cargo.lock | 2 + Cargo.toml | 2 +- relay-server/src/endpoints/common.rs | 6 ++- relay-server/src/endpoints/minidump.rs | 56 +++++++++++++---------- relay-server/src/endpoints/playstation.rs | 1 + relay-server/src/endpoints/upload.rs | 1 + relay-server/src/services/upload.rs | 31 +++++++++++-- 7 files changed, 69 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index af2180e5df3..4bd370dfde4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -729,8 +729,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce2548391e9c1929c21bf6aa2680af86fe4c1b33e6cea9ac1cfeec0bd11218cf" dependencies = [ "brotli 8.0.2", + "bzip2", "compression-core", "flate2", + "liblzma", "memchr", "zstd", "zstd-safe", diff --git a/Cargo.toml b/Cargo.toml index 5932888d8d8..21857231a0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,7 @@ android_trace_log = { version = "0.3", features = ["serde"] } # Keep it pinned until it's possible to disable backtrace. anyhow = "=1.0.69" arc-swap = "1" -async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd"] } +async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd", "bzip2", "xz"] } async-trait = "0.1" axum = "0.8" axum-extra = "0.12" diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 00de250cb54..024fff00d9a 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -24,7 +24,7 @@ use crate::service::ServiceState; use crate::services::buffer::{ProjectKeyPair, PushError}; use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome}; use crate::services::processor::{BucketSource, MetricData, ProcessMetrics}; -use crate::services::upload::{Create, ProjectContext, Stream, Upload}; +use crate::services::upload::{ContentEncoding, Create, ProjectContext, Stream, Upload}; use crate::statsd::{RelayCounters, RelayDistributions}; use crate::utils::{ self, ApiErrorResponse, BoundedStream, FormDataIter, MeteredStream, find_error_source, @@ -549,6 +549,7 @@ fn emit_envelope_metrics(envelope: &Envelope) { /// [AttachmentPlaceholder] as payload. pub async fn upload_to_objectstore( stream: S, + content_encoding: Option, content_type: Option, mut item: Managed, config: &Config, @@ -562,6 +563,7 @@ where { let res = upload_to_objectstore_inner( stream, + content_encoding, content_type, &mut item, config, @@ -578,6 +580,7 @@ where async fn upload_to_objectstore_inner( stream: S, + content_encoding: Option, content_type: Option, item: &mut Managed, config: &Config, @@ -612,6 +615,7 @@ where project, location, stream, + content_encoding, }) .await .ok()?; diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index 9ad15e8df9f..b98f46f10a4 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -1,3 +1,6 @@ +use async_compression::tokio::bufread::{ + BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzipDecoder, XzDecoder as AsyncXzDecoder, +}; use axum::RequestExt; use axum::extract::{DefaultBodyLimit, Request}; use axum::response::IntoResponse; @@ -5,7 +8,7 @@ use axum::routing::{MethodRouter, post}; use bytes::Bytes; use bzip2::read::BzDecoder; use flate2::read::GzDecoder; -use futures::{self, Stream}; +use futures::{self, Stream, StreamExt, TryStreamExt}; use liblzma::read::XzDecoder; use multer::{Field, Multipart}; use relay_config::Config; @@ -18,6 +21,8 @@ use std::convert::Infallible; use std::error::Error; use std::io::Cursor; use std::io::Read; +use tokio::io::BufReader; +use tokio_util::io::{ReaderStream, StreamReader}; use tower_http::limit::RequestBodyLimitLayer; use zstd::stream::Decoder as ZstdDecoder; @@ -30,7 +35,7 @@ use crate::middlewares; use crate::service::ServiceState; use crate::services::outcome::{DiscardAttachmentType, DiscardItemType, DiscardReason, Outcome}; use crate::services::projects::project::ProjectState; -use crate::services::upload::{ProjectContext, Upload}; +use crate::services::upload::{ByteStream, ContentEncoding, ProjectContext, Upload}; use crate::statsd::RelayCounters; use crate::utils::{self, AttachmentStrategy, read_bytes_into_item}; @@ -69,31 +74,29 @@ const MAGIC_PEEK: usize = 6; /// Content types by which standalone uploads can be recognized. const MINIDUMP_RAW_CONTENT_TYPES: &[&str] = &["application/octet-stream", "application/x-dmp"]; -#[derive(Debug, thiserror::Error)] -enum PeekError { - #[error("compressed minidump payloads are not supported for streaming upload")] - Compressed, - #[error(transparent)] - Source(#[from] E), +macro_rules! wrap_decode { + ($stream:expr, $decoder:ident) => {{ ReaderStream::new($decoder::new(BufReader::new(StreamReader::new($stream)))).boxed() }}; } -/// Peek the first bytes of `stream` and reject if they look compressed (gzip/xz/bzip2/zstd). -/// Returns the original stream contents if not. -async fn reject_if_compressed( - stream: S, -) -> Result> + Send, PeekError> +/// Peek the first bytes of `stream` and returns a decoding wrapper if necessary. +/// +/// Returns raw minidump bytes if the stream is uncompressed, otherwise decompresses +/// one of the minidump container formats we support for inline uploads. +async fn decode_stream(stream: S) -> std::io::Result<(ByteStream, Option)> where - S: Stream> + Send, - E: Send, + S: Stream> + Send + 'static, + E: Into> + Send + 'static, { + let stream = stream.map_err(std::io::Error::other); let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; - - match Compression::from(&head) { - Compression::NoCompression => Ok(stream), - Compression::Gzip | Compression::Xz | Compression::Bzip2 | Compression::Zstd => { - Err(PeekError::Compressed) - } - } + let decoded = match Compression::from(&head) { + Compression::NoCompression => (stream.boxed(), None), + Compression::Zstd => (stream.boxed(), Some(ContentEncoding::Zstd)), + Compression::Gzip => (wrap_decode!(stream, AsyncGzipDecoder), None), + Compression::Xz => (wrap_decode!(stream, AsyncXzDecoder), None), + Compression::Bzip2 => (wrap_decode!(stream, AsyncBzDecoder), None), + }; + Ok(decoded) } fn validate_minidump(data: &[u8]) -> Result<(), BadStoreRequest> { @@ -350,6 +353,7 @@ where if !matches!(item.attachment_type(), Some(AttachmentType::Minidump)) { return upload_to_objectstore( stream, + None, content_type, item, config, @@ -361,8 +365,8 @@ where .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed); } - let stream = match reject_if_compressed(stream).await { - Ok(stream) => stream, + let (stream, encoding) = match decode_stream(stream).await { + Ok(decoded) => decoded, Err(_) => { let _ = item.reject_err(Outcome::Invalid(DiscardReason::InvalidMinidump)); return Err(BadStoreRequest::InvalidMinidump); @@ -371,6 +375,7 @@ where upload_to_objectstore( stream, + encoding, content_type, item, config, @@ -529,12 +534,13 @@ async fn raw_minidump_to_item( if let Some(upload_context) = upload_context && matches!(upload_context.upload_minidumps, UploadDecision::Upload) { - let stream = reject_if_compressed(request.into_body().into_data_stream()) + let (stream, encoding) = decode_stream(request.into_body().into_data_stream()) .await .map_err(|_| BadStoreRequest::InvalidMinidump)?; item = upload_to_objectstore( stream, + encoding, Some(content_type.to_string()).filter(|s| !s.is_empty()), item, state.config(), diff --git a/relay-server/src/endpoints/playstation.rs b/relay-server/src/endpoints/playstation.rs index d888b6d8da2..d44ae397bfe 100644 --- a/relay-server/src/endpoints/playstation.rs +++ b/relay-server/src/endpoints/playstation.rs @@ -149,6 +149,7 @@ impl<'a> AttachmentStrategy for PlaystationAttachmentStrategy<'a> { let content_type = field.content_type().map(ToString::to_string); Ok(common::upload_to_objectstore( field, + None, content_type, item, config, diff --git a/relay-server/src/endpoints/upload.rs b/relay-server/src/endpoints/upload.rs index 021a7bf51c9..16e7252b788 100644 --- a/relay-server/src/endpoints/upload.rs +++ b/relay-server/src/endpoints/upload.rs @@ -326,6 +326,7 @@ async fn upload( project, location, stream, + content_encoding: None, // NOTE: we could pass through zstd here if we can skip `RequestDecompressionLayer`. }) .await??; diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index 79f320b245c..5fdaded2dab 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -146,6 +146,18 @@ pub struct Stream { pub location: SignedLocation, /// The body to be uploaded to objectstore, with length validation. pub stream: BoundedStream>, + /// The content encoding of the stream. + /// + /// If `Some`, the service treats the stream as already encoded. + /// If `None`, the service will apply zstd compression to the stream while uploading. + pub content_encoding: Option, +} + +/// Type of compression that both Relay and Objectstore support. +/// +/// This can be used to pass compressed streams through to Objectstore without decoding. +pub enum ContentEncoding { + Zstd, } impl FromMessage for Upload { @@ -293,10 +305,16 @@ impl Service { project, location, stream, + content_encoding, } = stream; match &self.backend { Backend::Upstream { addr } => { - let (request, rx) = UploadRequest::upload(project, location.try_to_uri()?, stream); + let (request, rx) = UploadRequest::upload( + project, + location.try_to_uri()?, + stream, + content_encoding, + ); addr.send(SendRequest(request)); let response = rx.await??; SignedLocation::try_from_response(response) @@ -677,6 +695,7 @@ enum RequestKind { uri: String, stream: TakeOnce>>, encoding: HttpEncoding, + content_encoding: Option, }, } @@ -715,6 +734,7 @@ impl UploadRequest { project: ProjectContext, uri: String, stream: BoundedStream>, + content_encoding: Option, ) -> ( Self, oneshot::Receiver>, @@ -727,6 +747,7 @@ impl UploadRequest { uri, stream: TakeOnce::new(stream), encoding: HttpEncoding::Zstd, // just a default, will be overwritten by .configure() + content_encoding, }, sender, }, @@ -809,6 +830,7 @@ impl UpstreamRequest for UploadRequest { uri: _, stream, encoding, + content_encoding, } => { let Some(body) = RetryableStream::new(stream.clone()) else { relay_log::error!("upload request stream was already consumed"); @@ -816,8 +838,11 @@ impl UpstreamRequest for UploadRequest { }; tus::add_upload_headers(builder); - let body = encode_body(body, *encoding); - builder.content_encoding(*encoding); + let (body, encoding) = match content_encoding { + Some(ContentEncoding::Zstd) => (body.boxed(), HttpEncoding::Zstd), + None => (encode_body(body, *encoding), *encoding), + }; + builder.content_encoding(encoding); builder.body(reqwest::Body::wrap_stream(body)); } From 29fa09273b98f3addfa44c67ee03ec17a5177ed4 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 30 Jun 2026 11:10:50 +0200 Subject: [PATCH 03/13] simplify --- relay-server/src/endpoints/minidump.rs | 11 +++++---- relay-server/src/services/upload.rs | 31 +++++++------------------- 2 files changed, 13 insertions(+), 29 deletions(-) diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index b98f46f10a4..a65e5bd544a 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -1,6 +1,3 @@ -use async_compression::tokio::bufread::{ - BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzipDecoder, XzDecoder as AsyncXzDecoder, -}; use axum::RequestExt; use axum::extract::{DefaultBodyLimit, Request}; use axum::response::IntoResponse; @@ -87,14 +84,16 @@ where S: Stream> + Send + 'static, E: Into> + Send + 'static, { + use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder}; + let stream = stream.map_err(std::io::Error::other); let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; let decoded = match Compression::from(&head) { Compression::NoCompression => (stream.boxed(), None), Compression::Zstd => (stream.boxed(), Some(ContentEncoding::Zstd)), - Compression::Gzip => (wrap_decode!(stream, AsyncGzipDecoder), None), - Compression::Xz => (wrap_decode!(stream, AsyncXzDecoder), None), - Compression::Bzip2 => (wrap_decode!(stream, AsyncBzDecoder), None), + Compression::Gzip => (wrap_decode!(stream, GzipDecoder), None), + Compression::Xz => (wrap_decode!(stream, XzDecoder), None), + Compression::Bzip2 => (wrap_decode!(stream, BzDecoder), None), }; Ok(decoded) } diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index 5fdaded2dab..c26bfec41a5 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -6,7 +6,7 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; -use async_compression::tokio::bufread::{BrotliEncoder, DeflateEncoder, GzipEncoder, ZstdEncoder}; +use async_compression::tokio::bufread::ZstdEncoder; use bytes::Bytes; use chrono::DateTime; use chrono::Utc; @@ -694,7 +694,6 @@ enum RequestKind { Upload { uri: String, stream: TakeOnce>>, - encoding: HttpEncoding, content_encoding: Option, }, } @@ -746,7 +745,6 @@ impl UploadRequest { kind: RequestKind::Upload { uri, stream: TakeOnce::new(stream), - encoding: HttpEncoding::Zstd, // just a default, will be overwritten by .configure() content_encoding, }, sender, @@ -829,7 +827,6 @@ impl UpstreamRequest for UploadRequest { RequestKind::Upload { uri: _, stream, - encoding, content_encoding, } => { let Some(body) = RetryableStream::new(stream.clone()) else { @@ -838,13 +835,13 @@ impl UpstreamRequest for UploadRequest { }; tus::add_upload_headers(builder); - let (body, encoding) = match content_encoding { - Some(ContentEncoding::Zstd) => (body.boxed(), HttpEncoding::Zstd), - None => (encode_body(body, *encoding), *encoding), + let zstd_body = match content_encoding { + Some(ContentEncoding::Zstd) => reqwest::Body::wrap_stream(body), + None => reqwest::Body::wrap_stream(encode_body(body)), }; - builder.content_encoding(encoding); - builder.body(reqwest::Body::wrap_stream(body)); + builder.content_encoding(HttpEncoding::Zstd); + builder.body(zstd_body); } }; @@ -854,26 +851,14 @@ impl UpstreamRequest for UploadRequest { Ok(()) } - - fn configure(&mut self, config: &Config) { - if let RequestKind::Upload { encoding, .. } = &mut self.kind { - *encoding = config.http_encoding(); - } - } } -fn encode_body(stream: S, encoding: HttpEncoding) -> ByteStream +fn encode_body(stream: S) -> ByteStream where S: futures::Stream> + Send + 'static, { let reader = BufReader::new(StreamReader::new(stream)); - match encoding { - HttpEncoding::Identity => ReaderStream::new(reader).boxed(), - HttpEncoding::Deflate => ReaderStream::new(DeflateEncoder::new(reader)).boxed(), - HttpEncoding::Gzip => ReaderStream::new(GzipEncoder::new(reader)).boxed(), - HttpEncoding::Br => ReaderStream::new(BrotliEncoder::new(reader)).boxed(), - HttpEncoding::Zstd => ReaderStream::new(ZstdEncoder::new(reader)).boxed(), - } + ReaderStream::new(ZstdEncoder::new(reader)).boxed() } #[cfg(test)] From e2dbf10b07c9f17388017db67b890d3b1d6e6c8e Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 30 Jun 2026 11:24:42 +0200 Subject: [PATCH 04/13] lint --- relay-server/src/endpoints/common.rs | 37 ++++++++------- relay-server/src/endpoints/minidump.rs | 55 +++++++++++------------ relay-server/src/endpoints/playstation.rs | 11 +++-- 3 files changed, 53 insertions(+), 50 deletions(-) diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 024fff00d9a..62e2d4485a7 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -548,9 +548,7 @@ fn emit_envelope_metrics(envelope: &Envelope) { /// Uploads the content of `field` to the objectstore and returns an [Item] with an /// [AttachmentPlaceholder] as payload. pub async fn upload_to_objectstore( - stream: S, - content_encoding: Option, - content_type: Option, + stream: StreamWithHeaders, mut item: Managed, config: &Config, project: ProjectContext, @@ -561,27 +559,26 @@ where S: futures::Stream> + Send + 'static, E: Into> + 'static, { - let res = upload_to_objectstore_inner( - stream, - content_encoding, - content_type, - &mut item, - config, - project, - upload, - referrer, - ) - .await; + let res = + upload_to_objectstore_inner(stream, &mut item, config, project, upload, referrer).await; match res { Some(()) => Ok(item), None => Err(Outcome::Invalid(DiscardReason::Internal)).reject(&item), } } +/// A stream with metadata for request submission. +pub struct StreamWithHeaders { + /// The stream of data. + pub stream: S, + /// What the data is currently compressed as. + pub content_encoding: Option, + /// Content-type header to forward. + pub content_type: Option, +} + async fn upload_to_objectstore_inner( - stream: S, - content_encoding: Option, - content_type: Option, + stream: StreamWithHeaders, item: &mut Managed, config: &Config, project: ProjectContext, @@ -592,6 +589,12 @@ where S: futures::Stream> + Send + 'static, E: Into> + 'static, { + let StreamWithHeaders { + stream, + content_encoding, + content_type, + } = stream; + let stream: BoxStream<'static, io::Result> = Box::pin(stream.map_err(io::Error::other)); let stream = MeteredStream::new(stream, referrer); let stream = BoundedStream::new(stream, 1, config.max_upload_size()); diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index a65e5bd544a..777c5714cbc 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -24,7 +24,9 @@ use tower_http::limit::RequestBodyLimitLayer; use zstd::stream::Decoder as ZstdDecoder; use crate::constants::{ITEM_NAME_BREADCRUMBS1, ITEM_NAME_BREADCRUMBS2, ITEM_NAME_EVENT}; -use crate::endpoints::common::{self, BadStoreRequest, TextResponse, upload_to_objectstore}; +use crate::endpoints::common::{ + self, BadStoreRequest, StreamWithHeaders, TextResponse, upload_to_objectstore, +}; use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType, Items}; use crate::extractors::{RawContentType, RequestMeta}; use crate::managed::{Managed, ManagedResult}; @@ -89,7 +91,7 @@ where let stream = stream.map_err(std::io::Error::other); let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; let decoded = match Compression::from(&head) { - Compression::NoCompression => (stream.boxed(), None), + Compression::None => (stream.boxed(), None), Compression::Zstd => (stream.boxed(), Some(ContentEncoding::Zstd)), Compression::Gzip => (wrap_decode!(stream, GzipDecoder), None), Compression::Xz => (wrap_decode!(stream, XzDecoder), None), @@ -119,7 +121,7 @@ fn run_decoder(mut decoder: impl Read) -> std::io::Result> { /// Types of compression we support for minidump payloads. enum Compression { - NoCompression, + None, Gzip, Xz, Bzip2, @@ -137,7 +139,7 @@ impl Compression { } else if header.starts_with(ZSTD_MAGIC_HEADER) { Self::Zstd } else { - Self::NoCompression + Self::None } } } @@ -145,7 +147,7 @@ impl Compression { /// Creates a decoder based on the magic bytes in the minidump payload. fn decoder_from(minidump_data: Bytes) -> Option> { match Compression::from(&minidump_data) { - Compression::NoCompression => None, + Compression::None => None, Compression::Gzip => Some(Box::new(GzDecoder::new(Cursor::new(minidump_data)))), Compression::Xz => Some(Box::new(XzDecoder::new(Cursor::new(minidump_data)))), Compression::Bzip2 => Some(Box::new(BzDecoder::new(Cursor::new(minidump_data)))), @@ -350,21 +352,17 @@ where E: Into> + Send + 'static, { if !matches!(item.attachment_type(), Some(AttachmentType::Minidump)) { - return upload_to_objectstore( + let stream = StreamWithHeaders { stream, - None, + content_encoding: None, content_type, - item, - config, - project, - upload, - referrer, - ) - .await - .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed); + }; + return upload_to_objectstore(stream, item, config, project, upload, referrer) + .await + .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed); } - let (stream, encoding) = match decode_stream(stream).await { + let (stream, content_encoding) = match decode_stream(stream).await { Ok(decoded) => decoded, Err(_) => { let _ = item.reject_err(Outcome::Invalid(DiscardReason::InvalidMinidump)); @@ -372,18 +370,14 @@ where } }; - upload_to_objectstore( + let stream = StreamWithHeaders { stream, - encoding, + content_encoding, content_type, - item, - config, - project, - upload, - referrer, - ) - .await - .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed) + }; + upload_to_objectstore(stream, item, config, project, upload, referrer) + .await + .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed) } async fn multipart_to_items( @@ -533,14 +527,17 @@ async fn raw_minidump_to_item( if let Some(upload_context) = upload_context && matches!(upload_context.upload_minidumps, UploadDecision::Upload) { - let (stream, encoding) = decode_stream(request.into_body().into_data_stream()) + let (stream, content_encoding) = decode_stream(request.into_body().into_data_stream()) .await .map_err(|_| BadStoreRequest::InvalidMinidump)?; + let stream = StreamWithHeaders { + stream, + content_encoding, + content_type: Some(content_type.to_string()).filter(|s| !s.is_empty()), + }; item = upload_to_objectstore( stream, - encoding, - Some(content_type.to_string()).filter(|s| !s.is_empty()), item, state.config(), upload_context.project, diff --git a/relay-server/src/endpoints/playstation.rs b/relay-server/src/endpoints/playstation.rs index d44ae397bfe..cfc2b378637 100644 --- a/relay-server/src/endpoints/playstation.rs +++ b/relay-server/src/endpoints/playstation.rs @@ -13,7 +13,7 @@ use relay_system::Addr; use serde::Serialize; use tower_http::limit::RequestBodyLimitLayer; -use crate::endpoints::common::{self, BadStoreRequest, TextResponse}; +use crate::endpoints::common::{self, BadStoreRequest, StreamWithHeaders, TextResponse}; use crate::envelope::{AttachmentType, ContentType, Envelope, Item, Items}; use crate::extractors::{RawContentType, RequestMeta}; use crate::managed::{Managed, ManagedResult}; @@ -147,10 +147,13 @@ impl<'a> AttachmentStrategy for PlaystationAttachmentStrategy<'a> { match &self.upload_context { Some(upload_context) if self.infer_type(&field) != AttachmentType::Prosperodump => { let content_type = field.content_type().map(ToString::to_string); - Ok(common::upload_to_objectstore( - field, - None, + let stream = StreamWithHeaders { + stream: field, + content_encoding: None, content_type, + }; + Ok(common::upload_to_objectstore( + stream, item, config, upload_context.project.clone(), From aa3da0f5a7c77373a641aebeedc736d5ab6c618b Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 30 Jun 2026 15:43:50 +0200 Subject: [PATCH 05/13] update test --- relay-server/src/endpoints/common.rs | 4 ++ tests/integration/test_minidump.py | 103 +++++++-------------------- 2 files changed, 29 insertions(+), 78 deletions(-) diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 62e2d4485a7..a2a6f3d5309 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -608,6 +608,7 @@ where }) .await .ok()? + .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; let scoping = project.scoping; @@ -621,6 +622,7 @@ where content_encoding, }) .await + .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; let location = result @@ -634,6 +636,7 @@ where "multipart item upload failed", ); }) + .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; let location = location.into_header_value().ok()?; let location = location.to_str().ok()?; @@ -641,6 +644,7 @@ where location, content_type, }) + .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; item.modify(|inner, records| { diff --git a/tests/integration/test_minidump.py b/tests/integration/test_minidump.py index 332cb918532..be6447db33e 100644 --- a/tests/integration/test_minidump.py +++ b/tests/integration/test_minidump.py @@ -491,17 +491,20 @@ def test_minidump_invalid_nested_formdata(mini_sentry, relay): @pytest.mark.parametrize( - "rate_limit,minidump_filename,use_objectstore", + "rate_limit,minidump_filename,use_objectstore,stream_upload", [ - (None, "minidump.dmp", True), - (None, "minidump.dmp", False), - ("attachment", "minidump.dmp", True), - ("attachment", "minidump.dmp", False), - ("transaction", "minidump.dmp", False), - (None, "minidump.dmp.gz", False), - (None, "minidump.dmp.xz", False), - (None, "minidump.dmp.bz2", False), - (None, "minidump.dmp.zst", False), + (None, "minidump.dmp", True, False), + (None, "minidump.dmp", False, False), + ("attachment", "minidump.dmp", True, False), + ("attachment", "minidump.dmp", False, False), + ("transaction", "minidump.dmp", False, False), + (None, "minidump.dmp.gz", False, False), + (None, "minidump.dmp.xz", False, False), + (None, "minidump.dmp.bz2", False, False), + (None, "minidump.dmp.bz2", True, False), + (None, "minidump.dmp.zst", False, False), + (None, "minidump.dmp.zst", True, False), + (None, "minidump.dmp.zst", True, True), ], ) def test_minidump_with_processing( @@ -513,6 +516,7 @@ def test_minidump_with_processing( minidump_filename, use_objectstore, objectstore, + stream_upload, ): dmp_path = os.path.join(os.path.dirname(__file__), "fixtures/native/minidump.dmp") with open(dmp_path, "rb") as f: @@ -533,6 +537,10 @@ def test_minidump_with_processing( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) project_config["config"]["eventRetention"] = 50000 + if stream_upload: + project_config["config"].setdefault("features", []).append( + "projects:relay-minidump-uploads" + ) options = { "processing": { @@ -595,10 +603,15 @@ def test_minidump_with_processing( assert event["exception"]["values"][0]["mechanism"]["type"] == "minidump" # Check information extracted from the minidump - assert event["timestamp"] == 1574692481.0 # 11/25/2019 @ 2:34pm (UTC) + if not stream_upload: + assert event["timestamp"] == 1574692481.0 # 11/25/2019 @ 2:34pm (UTC) # Check that the SDK name is correctly detected - assert event["sdk"]["name"] == "minidump.unknown" + assert ( + event["sdk"]["name"] == "minidump.upload" + if stream_upload + else "minidump.unknown" + ) if not use_objectstore: assert list(message["attachments"]) == [ @@ -1620,72 +1633,6 @@ def test_minidump_large_attachment_skipped_when_no_project_fetching(mini_sentry, assert envelope.items[0].payload.bytes == minidump_content -@pytest.mark.parametrize( - "magic,filename", - [ - pytest.param(b"\x1f\x8b", "minidump.dmp.gz", id="gzip"), - pytest.param(b"\xfd7zXZ\x00", "minidump.dmp.xz", id="xz"), - pytest.param(b"BZh", "minidump.dmp.bz2", id="bzip2"), - pytest.param(b"\x28\xb5\x2f\xfd", "minidump.dmp.zst", id="zstd"), - ], -) -def test_minidump_objectstore_uploads_rejects_compressed( - mini_sentry, - relay, - magic, - filename, -): - """ - When streaming a minidump to objectstore, a compressed payload should be reject - (untill objectstore or minidump can handle them). - """ - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"].setdefault("features", []).append( - "projects:relay-minidump-uploads" - ) - - relay = relay( - mini_sentry, - options={ - "outcomes": { - "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, - }, - }, - ) - - with pytest.raises(HTTPError) as exc_info: - relay.send_minidump( - project_id=project_id, - files=[(MINIDUMP_ATTACHMENT_NAME, filename, magic + b"\x00" * 32)], - ) - - assert exc_info.value.response.status_code == 400 - - assert mini_sentry.get_aggregated_outcomes() == [ - { - "category": 1, - "outcome": 3, - "quantity": 1, - "reason": "invalid_minidump", - }, - { - "category": 4, - "outcome": 3, - "reason": "invalid_minidump", - "quantity": 1, - }, - { - "category": 22, - "outcome": 3, - "reason": "invalid_minidump", - "quantity": 1, - }, - ] - - def test_minidump_upload_failure_bubbles_up(mini_sentry, relay): project_id = 42 minidump_content = b"MDMP content" From fac09cd4f8a6b73e535324657e42779bdb6b296e Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 1 Jul 2026 12:03:31 +0200 Subject: [PATCH 06/13] simplify --- relay-server/src/endpoints/common.rs | 37 ++++++-------- relay-server/src/endpoints/minidump.rs | 61 ++++++++++++----------- relay-server/src/endpoints/playstation.rs | 10 ++-- relay-server/src/endpoints/upload.rs | 1 - relay-server/src/services/upload.rs | 54 ++++++++------------ 5 files changed, 71 insertions(+), 92 deletions(-) diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index a2a6f3d5309..3e66c691074 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -24,7 +24,7 @@ use crate::service::ServiceState; use crate::services::buffer::{ProjectKeyPair, PushError}; use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome}; use crate::services::processor::{BucketSource, MetricData, ProcessMetrics}; -use crate::services::upload::{ContentEncoding, Create, ProjectContext, Stream, Upload}; +use crate::services::upload::{Create, ProjectContext, Stream, Upload}; use crate::statsd::{RelayCounters, RelayDistributions}; use crate::utils::{ self, ApiErrorResponse, BoundedStream, FormDataIter, MeteredStream, find_error_source, @@ -548,7 +548,8 @@ fn emit_envelope_metrics(envelope: &Envelope) { /// Uploads the content of `field` to the objectstore and returns an [Item] with an /// [AttachmentPlaceholder] as payload. pub async fn upload_to_objectstore( - stream: StreamWithHeaders, + stream: S, + content_type: Option, mut item: Managed, config: &Config, project: ProjectContext, @@ -559,26 +560,25 @@ where S: futures::Stream> + Send + 'static, E: Into> + 'static, { - let res = - upload_to_objectstore_inner(stream, &mut item, config, project, upload, referrer).await; + let res = upload_to_objectstore_inner( + stream, + content_type, + &mut item, + config, + project, + upload, + referrer, + ) + .await; match res { Some(()) => Ok(item), None => Err(Outcome::Invalid(DiscardReason::Internal)).reject(&item), } } -/// A stream with metadata for request submission. -pub struct StreamWithHeaders { - /// The stream of data. - pub stream: S, - /// What the data is currently compressed as. - pub content_encoding: Option, - /// Content-type header to forward. - pub content_type: Option, -} - async fn upload_to_objectstore_inner( - stream: StreamWithHeaders, + stream: S, + content_type: Option, item: &mut Managed, config: &Config, project: ProjectContext, @@ -589,12 +589,6 @@ where S: futures::Stream> + Send + 'static, E: Into> + 'static, { - let StreamWithHeaders { - stream, - content_encoding, - content_type, - } = stream; - let stream: BoxStream<'static, io::Result> = Box::pin(stream.map_err(io::Error::other)); let stream = MeteredStream::new(stream, referrer); let stream = BoundedStream::new(stream, 1, config.max_upload_size()); @@ -619,7 +613,6 @@ where project, location, stream, - content_encoding, }) .await .inspect_err(|e| relay_log::debug!(error = ?e)) diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index 777c5714cbc..a0fd8b771c8 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -24,9 +24,7 @@ use tower_http::limit::RequestBodyLimitLayer; use zstd::stream::Decoder as ZstdDecoder; use crate::constants::{ITEM_NAME_BREADCRUMBS1, ITEM_NAME_BREADCRUMBS2, ITEM_NAME_EVENT}; -use crate::endpoints::common::{ - self, BadStoreRequest, StreamWithHeaders, TextResponse, upload_to_objectstore, -}; +use crate::endpoints::common::{self, BadStoreRequest, TextResponse, upload_to_objectstore}; use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType, Items}; use crate::extractors::{RawContentType, RequestMeta}; use crate::managed::{Managed, ManagedResult}; @@ -34,7 +32,7 @@ use crate::middlewares; use crate::service::ServiceState; use crate::services::outcome::{DiscardAttachmentType, DiscardItemType, DiscardReason, Outcome}; use crate::services::projects::project::ProjectState; -use crate::services::upload::{ByteStream, ContentEncoding, ProjectContext, Upload}; +use crate::services::upload::{ByteStream, ProjectContext, Upload}; use crate::statsd::RelayCounters; use crate::utils::{self, AttachmentStrategy, read_bytes_into_item}; @@ -81,21 +79,21 @@ macro_rules! wrap_decode { /// /// Returns raw minidump bytes if the stream is uncompressed, otherwise decompresses /// one of the minidump container formats we support for inline uploads. -async fn decode_stream(stream: S) -> std::io::Result<(ByteStream, Option)> +async fn decode_stream(stream: S) -> std::io::Result where S: Stream> + Send + 'static, E: Into> + Send + 'static, { - use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder}; + use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdDecoder}; let stream = stream.map_err(std::io::Error::other); let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; let decoded = match Compression::from(&head) { - Compression::None => (stream.boxed(), None), - Compression::Zstd => (stream.boxed(), Some(ContentEncoding::Zstd)), - Compression::Gzip => (wrap_decode!(stream, GzipDecoder), None), - Compression::Xz => (wrap_decode!(stream, XzDecoder), None), - Compression::Bzip2 => (wrap_decode!(stream, BzDecoder), None), + Compression::None => stream.boxed(), + Compression::Zstd => wrap_decode!(stream, ZstdDecoder), + Compression::Gzip => wrap_decode!(stream, GzipDecoder), + Compression::Xz => wrap_decode!(stream, XzDecoder), + Compression::Bzip2 => wrap_decode!(stream, BzDecoder), }; Ok(decoded) } @@ -352,17 +350,20 @@ where E: Into> + Send + 'static, { if !matches!(item.attachment_type(), Some(AttachmentType::Minidump)) { - let stream = StreamWithHeaders { + return upload_to_objectstore( stream, - content_encoding: None, content_type, - }; - return upload_to_objectstore(stream, item, config, project, upload, referrer) - .await - .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed); + item, + config, + project, + upload, + referrer, + ) + .await + .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed); } - let (stream, content_encoding) = match decode_stream(stream).await { + let stream = match decode_stream(stream).await { Ok(decoded) => decoded, Err(_) => { let _ = item.reject_err(Outcome::Invalid(DiscardReason::InvalidMinidump)); @@ -370,14 +371,17 @@ where } }; - let stream = StreamWithHeaders { + upload_to_objectstore( stream, - content_encoding, content_type, - }; - upload_to_objectstore(stream, item, config, project, upload, referrer) - .await - .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed) + item, + config, + project, + upload, + referrer, + ) + .await + .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed) } async fn multipart_to_items( @@ -527,17 +531,14 @@ async fn raw_minidump_to_item( if let Some(upload_context) = upload_context && matches!(upload_context.upload_minidumps, UploadDecision::Upload) { - let (stream, content_encoding) = decode_stream(request.into_body().into_data_stream()) + let stream = decode_stream(request.into_body().into_data_stream()) .await .map_err(|_| BadStoreRequest::InvalidMinidump)?; - let stream = StreamWithHeaders { - stream, - content_encoding, - content_type: Some(content_type.to_string()).filter(|s| !s.is_empty()), - }; + let content_type = Some(content_type.to_string()).filter(|s| !s.is_empty()); item = upload_to_objectstore( stream, + content_type, item, state.config(), upload_context.project, diff --git a/relay-server/src/endpoints/playstation.rs b/relay-server/src/endpoints/playstation.rs index cfc2b378637..d888b6d8da2 100644 --- a/relay-server/src/endpoints/playstation.rs +++ b/relay-server/src/endpoints/playstation.rs @@ -13,7 +13,7 @@ use relay_system::Addr; use serde::Serialize; use tower_http::limit::RequestBodyLimitLayer; -use crate::endpoints::common::{self, BadStoreRequest, StreamWithHeaders, TextResponse}; +use crate::endpoints::common::{self, BadStoreRequest, TextResponse}; use crate::envelope::{AttachmentType, ContentType, Envelope, Item, Items}; use crate::extractors::{RawContentType, RequestMeta}; use crate::managed::{Managed, ManagedResult}; @@ -147,13 +147,9 @@ impl<'a> AttachmentStrategy for PlaystationAttachmentStrategy<'a> { match &self.upload_context { Some(upload_context) if self.infer_type(&field) != AttachmentType::Prosperodump => { let content_type = field.content_type().map(ToString::to_string); - let stream = StreamWithHeaders { - stream: field, - content_encoding: None, - content_type, - }; Ok(common::upload_to_objectstore( - stream, + field, + content_type, item, config, upload_context.project.clone(), diff --git a/relay-server/src/endpoints/upload.rs b/relay-server/src/endpoints/upload.rs index 16e7252b788..021a7bf51c9 100644 --- a/relay-server/src/endpoints/upload.rs +++ b/relay-server/src/endpoints/upload.rs @@ -326,7 +326,6 @@ async fn upload( project, location, stream, - content_encoding: None, // NOTE: we could pass through zstd here if we can skip `RequestDecompressionLayer`. }) .await??; diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index c26bfec41a5..79f320b245c 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -6,7 +6,7 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; -use async_compression::tokio::bufread::ZstdEncoder; +use async_compression::tokio::bufread::{BrotliEncoder, DeflateEncoder, GzipEncoder, ZstdEncoder}; use bytes::Bytes; use chrono::DateTime; use chrono::Utc; @@ -146,18 +146,6 @@ pub struct Stream { pub location: SignedLocation, /// The body to be uploaded to objectstore, with length validation. pub stream: BoundedStream>, - /// The content encoding of the stream. - /// - /// If `Some`, the service treats the stream as already encoded. - /// If `None`, the service will apply zstd compression to the stream while uploading. - pub content_encoding: Option, -} - -/// Type of compression that both Relay and Objectstore support. -/// -/// This can be used to pass compressed streams through to Objectstore without decoding. -pub enum ContentEncoding { - Zstd, } impl FromMessage for Upload { @@ -305,16 +293,10 @@ impl Service { project, location, stream, - content_encoding, } = stream; match &self.backend { Backend::Upstream { addr } => { - let (request, rx) = UploadRequest::upload( - project, - location.try_to_uri()?, - stream, - content_encoding, - ); + let (request, rx) = UploadRequest::upload(project, location.try_to_uri()?, stream); addr.send(SendRequest(request)); let response = rx.await??; SignedLocation::try_from_response(response) @@ -694,7 +676,7 @@ enum RequestKind { Upload { uri: String, stream: TakeOnce>>, - content_encoding: Option, + encoding: HttpEncoding, }, } @@ -733,7 +715,6 @@ impl UploadRequest { project: ProjectContext, uri: String, stream: BoundedStream>, - content_encoding: Option, ) -> ( Self, oneshot::Receiver>, @@ -745,7 +726,7 @@ impl UploadRequest { kind: RequestKind::Upload { uri, stream: TakeOnce::new(stream), - content_encoding, + encoding: HttpEncoding::Zstd, // just a default, will be overwritten by .configure() }, sender, }, @@ -827,7 +808,7 @@ impl UpstreamRequest for UploadRequest { RequestKind::Upload { uri: _, stream, - content_encoding, + encoding, } => { let Some(body) = RetryableStream::new(stream.clone()) else { relay_log::error!("upload request stream was already consumed"); @@ -835,13 +816,10 @@ impl UpstreamRequest for UploadRequest { }; tus::add_upload_headers(builder); - let zstd_body = match content_encoding { - Some(ContentEncoding::Zstd) => reqwest::Body::wrap_stream(body), - None => reqwest::Body::wrap_stream(encode_body(body)), - }; + let body = encode_body(body, *encoding); + builder.content_encoding(*encoding); - builder.content_encoding(HttpEncoding::Zstd); - builder.body(zstd_body); + builder.body(reqwest::Body::wrap_stream(body)); } }; @@ -851,14 +829,26 @@ impl UpstreamRequest for UploadRequest { Ok(()) } + + fn configure(&mut self, config: &Config) { + if let RequestKind::Upload { encoding, .. } = &mut self.kind { + *encoding = config.http_encoding(); + } + } } -fn encode_body(stream: S) -> ByteStream +fn encode_body(stream: S, encoding: HttpEncoding) -> ByteStream where S: futures::Stream> + Send + 'static, { let reader = BufReader::new(StreamReader::new(stream)); - ReaderStream::new(ZstdEncoder::new(reader)).boxed() + match encoding { + HttpEncoding::Identity => ReaderStream::new(reader).boxed(), + HttpEncoding::Deflate => ReaderStream::new(DeflateEncoder::new(reader)).boxed(), + HttpEncoding::Gzip => ReaderStream::new(GzipEncoder::new(reader)).boxed(), + HttpEncoding::Br => ReaderStream::new(BrotliEncoder::new(reader)).boxed(), + HttpEncoding::Zstd => ReaderStream::new(ZstdEncoder::new(reader)).boxed(), + } } #[cfg(test)] From b0a578aa312e195269167fe1899cc47b5e667379 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 1 Jul 2026 13:49:59 +0200 Subject: [PATCH 07/13] fix filename --- relay-server/src/endpoints/minidump.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index a0fd8b771c8..5b3a2501fde 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -335,11 +335,11 @@ impl<'a> AttachmentStrategy for MinidumpAttachmentStrategy<'a> { } } -/// Wrapper around [`upload_to_objectstore`] that enforces that minidumps are not compressed. +/// Wrapper around [`upload_to_objectstore`] that decompresses minidumps if necessary. pub async fn upload_to_objectstore_checked( stream: S, content_type: Option, - item: Managed, + mut item: Managed, config: &Config, project: ProjectContext, upload: &Addr, @@ -371,6 +371,16 @@ where } }; + item.modify(|item, _| { + if let Some(filename) = item.filename() { + let new_filename = remove_container_extension(filename); + if new_filename != filename { + let new_filename = new_filename.to_owned(); + item.set_filename(new_filename); + } + } + }); + upload_to_objectstore( stream, content_type, From f5a67904dd55b5133ba6abfba15d6c631d4fb33b Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 1 Jul 2026 14:06:37 +0200 Subject: [PATCH 08/13] fix: content-type --- relay-server/src/endpoints/common.rs | 2 +- relay-server/src/endpoints/minidump.rs | 2 +- relay-server/src/services/store.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 3e66c691074..d0c751da4e6 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -635,7 +635,7 @@ where let location = location.to_str().ok()?; let placeholder = serde_json::to_vec(&AttachmentPlaceholder { location, - content_type, + content_type: dbg!(content_type), }) .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index 5b3a2501fde..a2bf5c35926 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -383,7 +383,7 @@ where upload_to_objectstore( stream, - content_type, + Some(ContentType::Minidump.to_string()), item, config, project, diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index bf89161d89d..ae30de5d326 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -990,7 +990,7 @@ impl StoreService { id: Uuid::new_v4().to_string(), name: item.filename().unwrap_or(UNNAMED_ATTACHMENT).to_owned(), rate_limited: item.rate_limited(), - content_type: placeholder.content_type, + content_type: dbg!(placeholder.content_type), attachment_type: item.attachment_type().unwrap_or_default(), size: item.attachment_body_size(), retention_days, From 4f80af8bd25ef128057c54ca6d278322dbcde6dc Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 1 Jul 2026 14:18:49 +0200 Subject: [PATCH 09/13] fix --- relay-server/src/endpoints/common.rs | 6 +----- relay-server/src/services/store.rs | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index d0c751da4e6..00de250cb54 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -602,7 +602,6 @@ where }) .await .ok()? - .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; let scoping = project.scoping; @@ -615,7 +614,6 @@ where stream, }) .await - .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; let location = result @@ -629,15 +627,13 @@ where "multipart item upload failed", ); }) - .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; let location = location.into_header_value().ok()?; let location = location.to_str().ok()?; let placeholder = serde_json::to_vec(&AttachmentPlaceholder { location, - content_type: dbg!(content_type), + content_type, }) - .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; item.modify(|inner, records| { diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index ae30de5d326..bf89161d89d 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -990,7 +990,7 @@ impl StoreService { id: Uuid::new_v4().to_string(), name: item.filename().unwrap_or(UNNAMED_ATTACHMENT).to_owned(), rate_limited: item.rate_limited(), - content_type: dbg!(placeholder.content_type), + content_type: placeholder.content_type, attachment_type: item.attachment_type().unwrap_or_default(), size: item.attachment_body_size(), retention_days, From 4b2b3c6bee60da97ccb6abd50302aceb979e8a43 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 2 Jul 2026 07:17:40 +0200 Subject: [PATCH 10/13] test: Add content_type --- CHANGELOG.md | 1 + relay-conventions/sentry-conventions | 2 +- tests/integration/test_minidump.py | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 013d35e3cd8..ede1fcdb10d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - Unset segment info for web vital spans. ([#6042](https://github.com/getsentry/relay/pull/6042)) - Set sentry.trace.status on segment spans. ([#6140](https://github.com/getsentry/relay/pull/6140)) - Don't modify segment information for V2 web vital spans. ([#6160](https://github.com/getsentry/relay/pull/6160)) +- Support compressed minidumps when the `relay-minidump-uploads` is enabled. ([#6151](https://github.com/getsentry/relay/pull/6151)) **Internal**: diff --git a/relay-conventions/sentry-conventions b/relay-conventions/sentry-conventions index daee5ba1c58..989dc716d72 160000 --- a/relay-conventions/sentry-conventions +++ b/relay-conventions/sentry-conventions @@ -1 +1 @@ -Subproject commit daee5ba1c580ff841db03b0decfb1b0250ad1edf +Subproject commit 989dc716d72d64e8dd30a3085414e15d41d6fcf7 diff --git a/tests/integration/test_minidump.py b/tests/integration/test_minidump.py index be6447db33e..32927852af9 100644 --- a/tests/integration/test_minidump.py +++ b/tests/integration/test_minidump.py @@ -1147,6 +1147,7 @@ def test_minidump_objectstore_uploads( ) assert json.loads(minidump.payload.bytes) == { "location": DUMMY_UPLOAD_LOCATION, + "content_type": "application/x-dmp", } else: assert ( From f476b33de040ebbd3bffef553f9b08ca07938059 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 2 Jul 2026 10:28:07 +0200 Subject: [PATCH 11/13] Always overwrite content type for minidumps --- relay-server/src/endpoints/minidump.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index a2bf5c35926..d6d9d9ca846 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -526,7 +526,6 @@ async fn raw_minidump_to_item( request: Request, meta: &RequestMeta, state: &ServiceState, - content_type: RawContentType, upload_context: Option>, ) -> Result, BadStoreRequest> { debug_assert!(!matches!( @@ -545,10 +544,9 @@ async fn raw_minidump_to_item( .await .map_err(|_| BadStoreRequest::InvalidMinidump)?; - let content_type = Some(content_type.to_string()).filter(|s| !s.is_empty()); item = upload_to_objectstore( stream, - content_type, + Some(ContentType::Minidump.to_string()), item, state.config(), upload_context.project, @@ -579,7 +577,7 @@ async fn items( request: Request, ) -> Result, BadStoreRequest> { let items = if MINIDUMP_RAW_CONTENT_TYPES.contains(&content_type.as_ref()) { - raw_minidump_to_item(request, meta, state, content_type, upload_context) + raw_minidump_to_item(request, meta, state, upload_context) .await? .map(|item, _| smallvec![item]) } else { From e213640c01e153f43a25668693f9bac3381bcb7a Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 2 Jul 2026 10:45:59 +0200 Subject: [PATCH 12/13] review --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ede1fcdb10d..63c8dbaa570 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ - Unset segment info for web vital spans. ([#6042](https://github.com/getsentry/relay/pull/6042)) - Set sentry.trace.status on segment spans. ([#6140](https://github.com/getsentry/relay/pull/6140)) - Don't modify segment information for V2 web vital spans. ([#6160](https://github.com/getsentry/relay/pull/6160)) -- Support compressed minidumps when the `relay-minidump-uploads` is enabled. ([#6151](https://github.com/getsentry/relay/pull/6151)) +- Support compressed minidumps when the `relay-minidump-uploads` feature is enabled. ([#6151](https://github.com/getsentry/relay/pull/6151)) **Internal**: From 1ea55e1c618f2ce65af18fc08a945c5ddc7d7794 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 2 Jul 2026 10:59:22 +0200 Subject: [PATCH 13/13] fix(minidump): Add test for faulty compression response code --- tests/integration/test_minidump.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/integration/test_minidump.py b/tests/integration/test_minidump.py index 9dda28003ea..2f28b3b504d 100644 --- a/tests/integration/test_minidump.py +++ b/tests/integration/test_minidump.py @@ -1689,6 +1689,28 @@ def create(**opts): ] +def test_faulty_compression_stream(mini_sentry, relay, dummy_upload): + project_id = 42 + minidump_content = b"\x1f\x8b" + b"not a valid gzip stream" + + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).append( + "projects:relay-minidump-uploads" + ) + + relay = relay(mini_sentry) + + response = relay.send_minidump( + project_id=project_id, + files=[(MINIDUMP_ATTACHMENT_NAME, "minidump.dmp.gz", minidump_content)], + raise_for_status=False, + ) + + assert response.status_code == 400 + assert response.json()["detail"] == "invalid compression" + assert mini_sentry.captured_envelopes.empty() + + def test_minidump_proxy_mode(mini_sentry, relay): project_id = 42 mini_sentry.add_full_project_config(project_id)