Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ParquetError::NeedMoreData mark ParquetError as non_exhaustive #6630

Merged
merged 5 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
return Err(ParquetError::EOF(format!(
"file size of {} is less than footer + metadata {}",
file_size,
length + 8
length + FOOTER_SIZE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

)));
}

Expand Down
5 changes: 5 additions & 0 deletions parquet/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use arrow_schema::ArrowError;
// Note: we don't implement PartialEq as the semantics for the
// external variant are not well defined (#4469)
#[derive(Debug)]
#[non_exhaustive]
pub enum ParquetError {
/// General Parquet error.
/// Returned when code violates normal workflow of working with Parquet files.
Expand All @@ -47,6 +48,9 @@ pub enum ParquetError {
IndexOutOfBound(usize, usize),
/// An external error variant
External(Box<dyn Error + Send + Sync>),
/// Returned when a function needs more data to complete properly. The `usize` field indicates
/// the total number of bytes required, not the number of additional bytes.
NeedMoreData(usize),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also mark ParquetError non exhaustive at the same time (so future additions won't be breaking API changes)?

Like this:

#[non_exhaustive]

}

impl std::fmt::Display for ParquetError {
Expand All @@ -63,6 +67,7 @@ impl std::fmt::Display for ParquetError {
write!(fmt, "Index {index} out of bound: {bound}")
}
ParquetError::External(e) => write!(fmt, "External: {e}"),
ParquetError::NeedMoreData(needed) => write!(fmt, "NeedMoreData: {needed}"),
}
}
}
Expand Down
101 changes: 73 additions & 28 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,10 @@ impl ParquetMetaDataReader {
///
/// # Errors
///
/// This function will return [`ParquetError::IndexOutOfBound`] in the event `reader` does not
/// provide enough data to fully parse the metadata (see example below).
/// This function will return [`ParquetError::NeedMoreData`] in the event `reader` does not
/// provide enough data to fully parse the metadata (see example below). The returned error
/// will be populated with a `usize` field indicating the number of bytes required from the
/// tail of the file to completely parse the requested metadata.
///
/// Other errors returned include [`ParquetError::General`] and [`ParquetError::EOF`].
///
Expand All @@ -192,27 +194,58 @@ impl ParquetMetaDataReader {
/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
/// let file = open_parquet_file("some_path.parquet");
/// let len = file.len() as usize;
/// let bytes = get_bytes(&file, 1000..len);
/// // Speculatively read 1 kilobyte from the end of the file
/// let bytes = get_bytes(&file, len - 1024..len);
/// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
/// match reader.try_parse_sized(&bytes, len) {
/// Ok(_) => (),
/// Err(ParquetError::IndexOutOfBound(needed, _)) => {
/// Err(ParquetError::NeedMoreData(needed)) => {
/// // Read the needed number of bytes from the end of the file
/// let bytes = get_bytes(&file, len - needed..len);
/// reader.try_parse_sized(&bytes, len).unwrap();
/// }
/// _ => panic!("unexpected error")
/// }
/// let metadata = reader.finish().unwrap();
/// ```
///
/// Note that it is possible for the file metadata to be completely read, but there are
/// insufficient bytes available to read the page indexes. [`Self::has_metadata()`] can be used
/// to test for this. In the event the file metadata is present, re-parsing of the file
/// metadata can be skipped by using [`Self::read_page_indexes_sized()`], as shown below.
/// ```no_run
/// # use parquet::file::metadata::ParquetMetaDataReader;
/// # use parquet::errors::ParquetError;
/// # use crate::parquet::file::reader::Length;
/// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<usize>) -> bytes::Bytes { unimplemented!(); }
/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
/// let file = open_parquet_file("some_path.parquet");
/// let len = file.len() as usize;
/// // Speculatively read 1 kilobyte from the end of the file
/// let mut bytes = get_bytes(&file, len - 1024..len);
/// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
/// // Loop until `bytes` is large enough
/// loop {
/// match reader.try_parse_sized(&bytes, len) {
/// Ok(_) => break,
/// Err(ParquetError::NeedMoreData(needed)) => {
/// // Read the needed number of bytes from the end of the file
/// bytes = get_bytes(&file, len - needed..len);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// // If file metadata was read only read page indexes, otherwise continue loop
/// if reader.has_metadata() {
/// reader.read_page_indexes_sized(&bytes, len);
/// break;
/// }
/// }
/// _ => panic!("unexpected error")
/// }
/// }
/// let metadata = reader.finish().unwrap();
/// ```
pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: usize) -> Result<()> {
self.metadata = match self.parse_metadata(reader) {
Ok(metadata) => Some(metadata),
// FIXME: throughout this module ParquetError::IndexOutOfBound is used to indicate the
// need for more data. This is not it's intended use. The plan is to add a NeedMoreData
// value to the enum, but this would be a breaking change. This will be done as
// 54.0.0 draws nearer.
// https://github.com/apache/arrow-rs/issues/6447
Err(ParquetError::IndexOutOfBound(needed, _)) => {
Err(ParquetError::NeedMoreData(needed)) => {
// If reader is the same length as `file_size` then presumably there is no more to
// read, so return an EOF error.
if file_size == reader.len() as usize || needed > file_size {
Expand All @@ -223,7 +256,7 @@ impl ParquetMetaDataReader {
));
} else {
// Ask for a larger buffer
return Err(ParquetError::IndexOutOfBound(needed, file_size));
return Err(ParquetError::NeedMoreData(needed));
}
}
Err(e) => return Err(e),
Expand All @@ -246,7 +279,8 @@ impl ParquetMetaDataReader {
/// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
/// This variant is used when `reader` cannot access the entire Parquet file (e.g. it is
/// a [`Bytes`] struct containing the tail of the file).
/// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
/// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. Like
/// [`Self::try_parse_sized()`] this function may return [`ParquetError::NeedMoreData`].
pub fn read_page_indexes_sized<R: ChunkReader>(
&mut self,
reader: &R,
Expand Down Expand Up @@ -285,10 +319,7 @@ impl ParquetMetaDataReader {
));
} else {
// Ask for a larger buffer
return Err(ParquetError::IndexOutOfBound(
file_size - range.start,
file_size,
));
return Err(ParquetError::NeedMoreData(file_size - range.start));
}
}

Expand Down Expand Up @@ -484,10 +515,7 @@ impl ParquetMetaDataReader {
// check file is large enough to hold footer
let file_size = chunk_reader.len();
if file_size < (FOOTER_SIZE as u64) {
return Err(ParquetError::IndexOutOfBound(
FOOTER_SIZE,
file_size as usize,
));
return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
}

let mut footer = [0_u8; 8];
Expand All @@ -500,10 +528,7 @@ impl ParquetMetaDataReader {
self.metadata_size = Some(footer_metadata_len);

if footer_metadata_len > file_size as usize {
return Err(ParquetError::IndexOutOfBound(
footer_metadata_len,
file_size as usize,
));
return Err(ParquetError::NeedMoreData(footer_metadata_len));
}

let start = file_size - footer_metadata_len as u64;
Expand Down Expand Up @@ -682,7 +707,7 @@ mod tests {
let err = ParquetMetaDataReader::new()
.parse_metadata(&test_file)
.unwrap_err();
assert!(matches!(err, ParquetError::IndexOutOfBound(8, _)));
assert!(matches!(err, ParquetError::NeedMoreData(8)));
}

#[test]
Expand All @@ -701,7 +726,7 @@ mod tests {
let err = ParquetMetaDataReader::new()
.parse_metadata(&test_file)
.unwrap_err();
assert!(matches!(err, ParquetError::IndexOutOfBound(263, _)));
assert!(matches!(err, ParquetError::NeedMoreData(263)));
}

#[test]
Expand Down Expand Up @@ -794,7 +819,7 @@ mod tests {
// should fail
match reader.try_parse_sized(&bytes, len).unwrap_err() {
// expected error, try again with provided bounds
ParquetError::IndexOutOfBound(needed, _) => {
ParquetError::NeedMoreData(needed) => {
let bytes = bytes_for_range(len - needed..len);
reader.try_parse_sized(&bytes, len).unwrap();
let metadata = reader.finish().unwrap();
Expand All @@ -804,6 +829,26 @@ mod tests {
_ => panic!("unexpected error"),
};

// not enough for file metadata, but keep trying until page indexes are read
let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
let mut bytes = bytes_for_range(452505..len);
loop {
match reader.try_parse_sized(&bytes, len) {
Ok(_) => break,
Err(ParquetError::NeedMoreData(needed)) => {
bytes = bytes_for_range(len - needed..len);
if reader.has_metadata() {
reader.read_page_indexes_sized(&bytes, len).unwrap();
break;
}
}
_ => panic!("unexpected error"),
}
}
let metadata = reader.finish().unwrap();
assert!(metadata.column_index.is_some());
assert!(metadata.offset_index.is_some());

// not enough for page index but lie about file size
let bytes = bytes_for_range(323584..len);
let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
Expand All @@ -818,7 +863,7 @@ mod tests {
// should fail
match reader.try_parse_sized(&bytes, len).unwrap_err() {
// expected error, try again with provided bounds
ParquetError::IndexOutOfBound(needed, _) => {
ParquetError::NeedMoreData(needed) => {
let bytes = bytes_for_range(len - needed..len);
reader.try_parse_sized(&bytes, len).unwrap();
reader.finish().unwrap();
Expand Down
Loading