Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions bindings/dotnet/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ pub fn parse_read_options(
let length = parse_u64(values, "length")?;
if offset > 0 || length.is_some() {
options.range = match validate_read_range_end(offset, length)? {
Some(end) => (offset..end).into(),
None => (offset..).into(),
Some(end) => opendal::raw::BytesRange::from_range(offset..end)
.map_err(OpenDALError::from_opendal_error)?,
None => opendal::raw::BytesRange::from_range(offset..)
.map_err(OpenDALError::from_opendal_error)?,
};
}

Expand Down
7 changes: 4 additions & 3 deletions bindings/java/src/async_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,10 @@ fn intern_read(
let op_cloned = unsafe { &*op }.clone();

executor_or_default(env, executor)?.spawn(async move {
let mut read_op = op_cloned.read_with(&path_str);
read_op = read_op.range(range);
let result = read_op.await.map_err(Into::into);
let result = match op_cloned.read_with(&path_str).range(range) {
Ok(fut) => fut.await.map_err(Into::into),
Err(e) => Err(e.into()),
};

let mut env = unsafe { get_current_env() };
let result = result.and_then(|bs| bytes_to_jbytearray(&mut env, bs.to_bytes()));
Expand Down
4 changes: 3 additions & 1 deletion bindings/java/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@ fn make_read_options<'a>(
let length = convert::read_int64_field(env, options, "length")?;

Ok(opendal::options::ReadOptions {
range: convert::offset_length_to_range(offset, length)?.into(),
range: opendal::raw::BytesRange::from_range(convert::offset_length_to_range(
offset, length,
)?)?,
..Default::default()
})
}
Expand Down
3 changes: 2 additions & 1 deletion core/benches/vs_fs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use criterion::Criterion;
use opendal::Operator;
use opendal::blocking;
use opendal::options;
use opendal::raw::BytesRange;
use opendal::services;
use rand::prelude::*;

Expand Down Expand Up @@ -59,7 +60,7 @@ fn bench_vs_fs(c: &mut Criterion) {
.read_options(
&path,
options::ReadOptions {
range: (0..16 * 1024 * 1024).into(),
range: BytesRange::from_range(0..16 * 1024 * 1024).unwrap(),
..Default::default()
},
)
Expand Down
5 changes: 3 additions & 2 deletions core/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
//! ```no_run
//! use opendal_core::layers::HttpClientLayer;
//! use opendal_core::options;
//! use opendal_core::raw::BytesRange;
//! use opendal_core::raw::HttpClient;
//! use opendal_core::services;
//! use opendal_core::Operator;
Expand All @@ -130,15 +131,15 @@
//! // Read data from `hello.txt` with options.
//! let bs = op
//! .read_with("hello.txt")
//! .range(0..8 * 1024 * 1024)
//! .range(0..8 * 1024 * 1024)?
//! .chunk(1024 * 1024)
//! .concurrent(4)
//! .await?;
//!
//! // The same to:
//! let bs = op
//! .read_options("hello.txt", options::ReadOptions {
//! range: (0..8 * 1024 * 1024).into(),
//! range: BytesRange::from_range(0..8 * 1024 * 1024)?,
//! chunk: Some(1024 * 1024),
//! concurrent: 4,
//! ..Default::default()
Expand Down
43 changes: 32 additions & 11 deletions core/core/src/raw/http_util/bytes_content_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,16 @@ impl BytesContentRange {
/// Update BytesContentRange with range.
///
/// The range is inclusive: `[start..=end]` as described in `content-range`.
pub fn with_range(mut self, start: u64, end: u64) -> Self {
pub fn with_range(mut self, start: u64, end: u64) -> Result<Self> {
if end < start {
return Err(Error::new(
ErrorKind::Unexpected,
format!("invalid BytesContentRange: end ({end}) < start ({start})"),
));
}
self.0 = Some(start);
self.1 = Some(end);
self
Ok(self)
}

/// Update BytesContentRange with size.
Expand Down Expand Up @@ -163,7 +169,7 @@ impl FromStr for BytesContentRange {
}
let start: u64 = v[0].parse().map_err(parse_int_error)?;
let end: u64 = v[1].parse().map_err(parse_int_error)?;
let mut bcr = BytesContentRange::default().with_range(start, end);
let mut bcr = BytesContentRange::default().with_range(start, end)?;

// Handle size part first.
if s[1] != "*" {
Expand All @@ -184,13 +190,13 @@ mod tests {
(
"range start with unknown size",
"bytes 123-123/*",
BytesContentRange::default().with_range(123, 123),
BytesContentRange::default().with_range(123, 123)?,
),
(
"range start with known size",
"bytes 123-123/1024",
BytesContentRange::default()
.with_range(123, 123)
.with_range(123, 123)?
.with_size(1024),
),
(
Expand All @@ -210,30 +216,45 @@ mod tests {
}

#[test]
fn test_bytes_content_range_to_string() {
fn test_bytes_content_range_to_string() -> Result<()> {
let h = BytesContentRange::default().with_size(1024);
assert_eq!(h.to_string(), "*/1024");

let h = BytesContentRange::default().with_range(0, 1023);
let h = BytesContentRange::default().with_range(0, 1023)?;
assert_eq!(h.to_string(), "0-1023/*");

let h = BytesContentRange::default()
.with_range(0, 1023)
.with_range(0, 1023)?
.with_size(1024);
assert_eq!(h.to_string(), "0-1023/1024");

Ok(())
}

#[test]
fn test_bytes_content_range_to_header() {
fn test_bytes_content_range_to_header() -> Result<()> {
let h = BytesContentRange::default().with_size(1024);
assert_eq!(h.to_header(), "bytes */1024");

let h = BytesContentRange::default().with_range(0, 1023);
let h = BytesContentRange::default().with_range(0, 1023)?;
assert_eq!(h.to_header(), "bytes 0-1023/*");

let h = BytesContentRange::default()
.with_range(0, 1023)
.with_range(0, 1023)?
.with_size(1024);
assert_eq!(h.to_header(), "bytes 0-1023/1024");

Ok(())
}

#[test]
fn test_bytes_content_range_inverted_range_returns_error() {
let err = BytesContentRange::default()
.with_range(100, 50)
.unwrap_err();
assert!(
err.to_string()
.contains("invalid BytesContentRange: end (50) < start (100)")
);
}
}
89 changes: 68 additions & 21 deletions core/core/src/raw/http_util/bytes_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct BytesRange(
impl BytesRange {
/// Create a new `BytesRange`
///
/// It better to use `BytesRange::from(1024..2048)` to construct.
/// It better to use `BytesRange::from_range(1024..2048)` to construct.
///
/// # Note
///
Expand All @@ -73,12 +73,19 @@ impl BytesRange {

/// Advance the range by `n` bytes.
///
/// # Panics
///
/// Panic if input `n` is larger than the size of the range.
pub fn advance(&mut self, n: u64) {
/// Returns an error if `n` is larger than the size of the range.
pub fn advance(&mut self, n: u64) -> Result<()> {
if let Some(size) = self.1 {
if n > size {
return Err(Error::new(
ErrorKind::Unexpected,
format!("cannot advance BytesRange by {n} bytes, only {size} bytes left"),
));
}
}
self.0 += n;
self.1 = self.1.map(|size| size - n);
Ok(())
}

/// Check if this range is full of this content.
Expand Down Expand Up @@ -175,28 +182,44 @@ impl FromStr for BytesRange {
// <range-start>-<range-end>
let start: u64 = v[0].parse().map_err(parse_int_error)?;
let end: u64 = v[1].parse().map_err(parse_int_error)?;
Ok(BytesRange::new(start, Some(end - start + 1)))
BytesRange::from_range(start..=end)
}
}
}

impl<T> From<T> for BytesRange
where
T: RangeBounds<u64>,
{
fn from(range: T) -> Self {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is a super large change as many of our API depends on this.

And I also think we don't need to change this. Rust API is clear that "It is empty if start >= end."

We can follow the same pattern. Downstream caller can check and return empty results.

impl BytesRange {
/// Create a `BytesRange` from any `RangeBounds<u64>`.
///
/// Returns an error if the end bound is before the start bound.
pub fn from_range(range: impl RangeBounds<u64>) -> Result<Self> {
let offset = match range.start_bound().cloned() {
Bound::Included(n) => n,
Bound::Excluded(n) => n + 1,
Bound::Unbounded => 0,
};
let size = match range.end_bound().cloned() {
Bound::Included(n) => Some(n + 1 - offset),
Bound::Excluded(n) => Some(n - offset),
Bound::Included(n) => {
if n + 1 < offset {
return Err(Error::new(
ErrorKind::Unexpected,
format!("invalid range: inclusive end ({n}) is before start ({offset})"),
));
}
Some(n + 1 - offset)
}
Bound::Excluded(n) => {
if n < offset {
return Err(Error::new(
ErrorKind::Unexpected,
format!("invalid range: exclusive end ({n}) is before start ({offset})"),
));
}
Some(n - offset)
}
Bound::Unbounded => None,
};

BytesRange(offset, size)
Ok(BytesRange(offset, size))
}
}

Expand Down Expand Up @@ -240,13 +263,21 @@ mod tests {
}

#[test]
fn test_bytes_range_from_range_bounds() {
assert_eq!(BytesRange::new(0, None), BytesRange::from(..));
assert_eq!(BytesRange::new(10, None), BytesRange::from(10..));
assert_eq!(BytesRange::new(0, Some(11)), BytesRange::from(..=10));
assert_eq!(BytesRange::new(0, Some(10)), BytesRange::from(..10));
assert_eq!(BytesRange::new(10, Some(10)), BytesRange::from(10..20));
assert_eq!(BytesRange::new(10, Some(11)), BytesRange::from(10..=20));
fn test_bytes_range_from_range_bounds() -> Result<()> {
assert_eq!(BytesRange::new(0, None), BytesRange::from_range(..)?);
assert_eq!(BytesRange::new(10, None), BytesRange::from_range(10..)?);
assert_eq!(BytesRange::new(0, Some(11)), BytesRange::from_range(..=10)?);
assert_eq!(BytesRange::new(0, Some(10)), BytesRange::from_range(..10)?);
assert_eq!(
BytesRange::new(10, Some(10)),
BytesRange::from_range(10..20)?
);
assert_eq!(
BytesRange::new(10, Some(11)),
BytesRange::from_range(10..=20)?
);

Ok(())
}

#[test]
Expand All @@ -270,4 +301,20 @@ mod tests {

Ok(())
}

#[test]
fn test_bytes_range_from_str_inverted_returns_error() {
let result = "bytes=100-50".parse::<BytesRange>();
assert!(result.is_err(), "inverted range header should return Err");
}

#[test]
fn test_bytes_range_advance_beyond_size_returns_error() {
let mut r = BytesRange::new(0, Some(3));
let err = r.advance(5).unwrap_err();
assert!(
err.to_string()
.contains("cannot advance BytesRange by 5 bytes, only 3 bytes left")
);
}
}
10 changes: 10 additions & 0 deletions core/core/src/types/context/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ impl ReadContext {
}
};

if start > end {
return Err(Error::new(
ErrorKind::RangeNotSatisfied,
"read range is invalid: start ({start}) is after end ({end})",
)
.with_operation("ReadContext::parse_into_range")
.with_context("start", start.to_string())
.with_context("end", end.to_string()));
}

Ok(start..end)
}
}
Expand Down
10 changes: 6 additions & 4 deletions core/core/src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ use crate::*;
///
/// ```no_run
/// use opendal_core::options;
/// use opendal_core::raw::BytesRange;
/// use opendal_core::services;
/// use opendal_core::Operator;
/// use opendal_core::Result;
Expand All @@ -124,15 +125,15 @@ use crate::*;
/// // Read data from `hello.txt` with options.
/// let bs = op
/// .read_with("hello.txt")
/// .range(0..8 * 1024 * 1024)
/// .range(0..8 * 1024 * 1024)?
/// .chunk(1024 * 1024)
/// .concurrent(4)
/// .await?;
///
/// // The same to:
/// let bs = op
/// .read_options("hello.txt", options::ReadOptions {
/// range: (0..8 * 1024 * 1024).into(),
/// range: BytesRange::from_range(0..8 * 1024 * 1024)?,
/// chunk: Some(1024 * 1024),
/// concurrent: 4,
/// ..Default::default()
Expand Down Expand Up @@ -469,7 +470,7 @@ impl Operator {
/// # use opendal_core::Result;
/// # use opendal_core::Operator;
/// # async fn test(op: Operator) -> Result<()> {
/// let bs = op.read_with("path/to/file").range(0..10).await?;
/// let bs = op.read_with("path/to/file").range(0..10)?.await?;
/// # Ok(())
/// # }
/// ```
Expand Down Expand Up @@ -500,10 +501,11 @@ impl Operator {
/// # use opendal_core::Result;
/// # use opendal_core::Operator;
/// use opendal_core::options;
/// use opendal_core::raw::BytesRange;
/// # async fn test(op: Operator) -> Result<()> {
/// let bs = op
/// .read_options("path/to/file", options::ReadOptions {
/// range: (0..10).into(),
/// range: BytesRange::from_range(0..10)?,
/// ..Default::default()
/// })
/// .await?;
Expand Down
8 changes: 4 additions & 4 deletions core/core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ impl<F: Future<Output = Result<Buffer>>> FutureRead<F> {
/// # use opendal_core::Operator;
/// # use futures::TryStreamExt;
/// # async fn test(op: Operator) -> Result<()> {
/// let bs = op.read_with("path/to/file").range(0..1024).await?;
/// let bs = op.read_with("path/to/file").range(0..1024)?.await?;
/// # Ok(())
/// # }
/// ```
pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
self.args.range = range.into();
self
pub fn range(mut self, range: impl RangeBounds<u64>) -> Result<Self> {
self.args.range = BytesRange::from_range(range)?;
Ok(self)
}

/// Set `concurrent` for the reader.
Expand Down
Loading
Loading