From b91b3160ba64407556b147167bc62c972988eaf7 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 24 Oct 2024 13:47:28 -0500 Subject: [PATCH 1/4] add boolean row selection implementation --- .../arrow/arrow_reader/boolean_selection.rs | 319 ++++++++++++++++++ parquet/src/arrow/arrow_reader/mod.rs | 2 + 2 files changed, 321 insertions(+) create mode 100644 parquet/src/arrow/arrow_reader/boolean_selection.rs diff --git a/parquet/src/arrow/arrow_reader/boolean_selection.rs b/parquet/src/arrow/arrow_reader/boolean_selection.rs new file mode 100644 index 000000000000..59cda6c1aeb9 --- /dev/null +++ b/parquet/src/arrow/arrow_reader/boolean_selection.rs @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ops::Range; + +use arrow_array::{Array, BooleanArray}; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, MutableBuffer}; +use arrow_data::bit_iterator::BitIndexIterator; + +use super::{RowSelection, RowSelector}; + +/// A selection of rows, similar to [`RowSelection`], but based on a boolean array +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BooleanRowSelection { + selector: BooleanBuffer, +} + +impl BooleanRowSelection { + /// Create a new [`BooleanRowSelection] from a list of [`BooleanArray`]. + pub fn from_filters(filters: &[BooleanArray]) -> Self { + let arrays: Vec<&dyn Array> = filters.iter().map(|x| x as &dyn Array).collect(); + let result = arrow_select::concat::concat(&arrays).unwrap().into_data(); + let (boolean_array, _null) = BooleanArray::from(result).into_parts(); + BooleanRowSelection { + selector: boolean_array, + } + } + + /// Create a new [`BooleanRowSelection`] with all rows unselected + pub fn new_unselected(row_count: usize) -> Self { + let buffer = BooleanBuffer::new_unset(row_count); + + BooleanRowSelection { selector: buffer } + } + + /// Create a new [`BooleanRowSelection`] with all rows selected + pub fn new_selected(row_count: usize) -> Self { + let buffer = BooleanBuffer::new_set(row_count); + + BooleanRowSelection { selector: buffer } + } + + /// Returns a new [`BooleanRowSelection`] that selects the inverse of this [`BooleanRowSelection`]. + pub fn as_inverted(&self) -> Self { + let buffer = !&self.selector; + BooleanRowSelection { selector: buffer } + } + + /// Returns the number of rows in this [`BooleanRowSelection`]. + pub fn len(&self) -> usize { + self.selector.len() + } + + /// Returns the number of rows selected by this [`BooleanRowSelection`]. + pub fn row_count(&self) -> usize { + self.selector.count_set_bits() + } + + /// Create a new [`BooleanRowSelection`] from a list of consecutive ranges. + pub fn from_consecutive_ranges( + ranges: impl Iterator>, + total_rows: usize, + ) -> Self { + let mut buffer = BooleanBufferBuilder::new(total_rows); + let mut last_end = 0; + + for range in ranges { + let len = range.end - range.start; + if len == 0 { + continue; + } + + if range.start > last_end { + buffer.append_n(range.start - last_end, false); + } + buffer.append_n(len, true); + last_end = range.end; + } + + if last_end != total_rows { + buffer.append_n(total_rows - last_end, false); + } + + BooleanRowSelection { + selector: buffer.finish(), + } + } + + /// Compute the union of two [`BooleanRowSelection`] + /// For example: + /// self: NNYYYYNNYYNYN + /// other: NYNNNNNNN + /// + /// returned: NYYYYYNNYYNYN + #[must_use] + pub fn union(&self, other: &Self) -> Self { + // use arrow::compute::kernels::boolean::or; + + let union_selectors = &self.selector | &other.selector; + + BooleanRowSelection { + selector: union_selectors, + } + } + + /// Compute the intersection of two [`BooleanRowSelection`] + /// For example: + /// self: NNYYYYNNYYNYN + /// other: NYNNNNNNY + /// + /// returned: NNNNNNNNYYNYN + #[must_use] + pub fn intersection(&self, other: &Self) -> Self { + let intersection_selectors = &self.selector & &other.selector; + + BooleanRowSelection { + selector: intersection_selectors, + } + } + + /// Combines this [`BooleanRowSelection`] with another using logical AND on the selected bits. + /// + /// Unlike [`intersection`], the `other` [`BooleanRowSelection`] must have exactly as many set bits as `self`. + /// This method will keep only the bits in `self` that are also set in `other` + /// at the positions corresponding to `self`'s set bits. + pub fn and_then(&self, other: &Self) -> Self { + // Ensure that 'other' has exactly as many set bits as 'self' + debug_assert_eq!( + self.row_count(), + other.len(), + "The 'other' selection must have exactly as many set bits as 'self'." + ); + + if self.len() == other.len() { + // fast path if the two selections are the same length + // common if this is the first predicate + debug_assert_eq!(self.row_count(), self.len()); + return self.intersection(other); + } + + let mut buffer = MutableBuffer::from_len_zeroed(self.len()); + buffer.copy_from_slice(self.selector.values()); + let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, self.len()); + + // Create iterators for 'self' and 'other' bits + let mut other_bits = other.selector.iter(); + + for bit_idx in self.true_iter() { + let predicate = other_bits + .next() + .expect("Mismatch in set bits between self and other"); + if !predicate { + builder.set_bit(bit_idx, false); + } + } + + BooleanRowSelection { + selector: builder.finish(), + } + } + + /// Returns an iterator over the indices of the set bits in this [`BooleanRowSelection`] + pub fn true_iter(&self) -> BitIndexIterator<'_> { + self.selector.set_indices() + } + + /// Returns `true` if this [`BooleanRowSelection`] selects any rows + pub fn selects_any(&self) -> bool { + self.true_iter().next().is_some() + } + + /// Returns a new [`BooleanRowSelection`] that selects the rows in this [`BooleanRowSelection`] from `offset` to `offset + len` + pub fn slice(&self, offset: usize, len: usize) -> BooleanArray { + BooleanArray::new(self.selector.slice(offset, len), None) + } +} + +impl From> for BooleanRowSelection { + fn from(selection: Vec) -> Self { + let selection = RowSelection::from(selection); + RowSelection::into(selection) + } +} + +impl From for BooleanRowSelection { + fn from(selection: RowSelection) -> Self { + let total_rows = selection.row_count(); + let mut builder = BooleanBufferBuilder::new(total_rows); + + for selector in selection.iter() { + if selector.skip { + builder.append_n(selector.row_count, false); + } else { + builder.append_n(selector.row_count, true); + } + } + + BooleanRowSelection { + selector: builder.finish(), + } + } +} + +impl From<&BooleanRowSelection> for RowSelection { + fn from(selection: &BooleanRowSelection) -> Self { + let array = BooleanArray::new(selection.selector.clone(), None); + RowSelection::from_filters(&[array]) + } +} + +#[cfg(test)] +mod tests { + use rand::Rng; + + use super::*; + + fn generate_random_row_selection(total_rows: usize, selection_ratio: f64) -> BooleanArray { + let mut rng = rand::thread_rng(); + let bools: Vec = (0..total_rows) + .map(|_| rng.gen_bool(selection_ratio)) + .collect(); + BooleanArray::from(bools) + } + + #[test] + fn test_boolean_row_selection_round_trip() { + let total_rows = 1_000; + for &selection_ratio in &[0.0, 0.1, 0.5, 0.9, 1.0] { + let selection = generate_random_row_selection(total_rows, selection_ratio); + let boolean_selection = BooleanRowSelection::from_filters(&[selection]); + let row_selection = RowSelection::from(&boolean_selection); + let boolean_selection_again = row_selection.into(); + assert_eq!(boolean_selection, boolean_selection_again); + } + } + + #[test] + fn test_boolean_union_intersection() { + let total_rows = 1_000; + + let base_boolean_selection = + BooleanRowSelection::from_filters(&[generate_random_row_selection(total_rows, 0.1)]); + let base_row_selection = RowSelection::from(&base_boolean_selection); + for &selection_ratio in &[0.0, 0.1, 0.5, 0.9, 1.0] { + let boolean_selection = + BooleanRowSelection::from_filters(&[generate_random_row_selection( + total_rows, + selection_ratio, + )]); + let row_selection = RowSelection::from(&boolean_selection); + + let boolean_union = boolean_selection.union(&base_boolean_selection); + let row_union = row_selection.union(&base_row_selection); + assert_eq!(boolean_union, BooleanRowSelection::from(row_union)); + + let boolean_intersection = boolean_selection.intersection(&base_boolean_selection); + let row_intersection = row_selection.intersection(&base_row_selection); + assert_eq!( + boolean_intersection, + BooleanRowSelection::from(row_intersection) + ); + } + } + + #[test] + fn test_boolean_selection_and_then() { + // Initial mask: 001011010101 + let self_filters = vec![BooleanArray::from(vec![ + false, false, true, false, true, true, false, true, false, true, false, true, + ])]; + let self_selection = BooleanRowSelection::from_filters(&self_filters); + + // Predicate mask (only for selected bits): 001101 + let other_filters = vec![BooleanArray::from(vec![ + false, false, true, true, false, true, + ])]; + let other_selection = BooleanRowSelection::from_filters(&other_filters); + + let result = self_selection.and_then(&other_selection); + + // Expected result: 000001010001 + let expected_filters = vec![BooleanArray::from(vec![ + false, false, false, false, false, true, false, true, false, false, false, true, + ])]; + let expected_selection = BooleanRowSelection::from_filters(&expected_filters); + + assert_eq!(result, expected_selection); + } + + #[test] + #[should_panic( + expected = "The 'other' selection must have exactly as many set bits as 'self'." + )] + fn test_and_then_mismatched_set_bits() { + let self_filters = vec![BooleanArray::from(vec![true, true, false])]; + let self_selection = BooleanRowSelection::from_filters(&self_filters); + + // 'other' has only one set bit, but 'self' has two + let other_filters = vec![BooleanArray::from(vec![true, false, false])]; + let other_selection = BooleanRowSelection::from_filters(&other_filters); + + // This should panic + let _ = self_selection.and_then(&other_selection); + } +} diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d3709c03e99a..1b28126d20c6 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -38,6 +38,8 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; +#[cfg(feature = "experimental")] +mod boolean_selection; mod filter; mod selection; pub mod statistics; From 1f017ff516b1d908cb41624eae992b7fb2f841e5 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 25 Oct 2024 10:03:48 -0500 Subject: [PATCH 2/4] add boolean row selection implementation --- parquet/Cargo.toml | 2 +- parquet/benches/row_selector.rs | 113 ++++++++++++------ .../arrow/arrow_reader/boolean_selection.rs | 6 +- parquet/src/arrow/arrow_reader/mod.rs | 15 ++- 4 files changed, 87 insertions(+), 49 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 32bc13b62a53..6d88a7351495 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -225,7 +225,7 @@ harness = false [[bench]] name = "row_selector" harness = false -required-features = ["arrow"] +required-features = ["arrow", "experimental"] [lib] bench = false diff --git a/parquet/benches/row_selector.rs b/parquet/benches/row_selector.rs index 32f0d6a56064..783a4644f549 100644 --- a/parquet/benches/row_selector.rs +++ b/parquet/benches/row_selector.rs @@ -17,7 +17,7 @@ use arrow_array::BooleanArray; use criterion::*; -use parquet::arrow::arrow_reader::RowSelection; +use parquet::arrow::arrow_reader::{BooleanRowSelection, RowSelection}; use rand::Rng; /// Generates a random RowSelection with a specified selection ratio. @@ -40,47 +40,86 @@ fn generate_random_row_selection(total_rows: usize, selection_ratio: f64) -> Boo fn criterion_benchmark(c: &mut Criterion) { let total_rows = 300_000; - let selection_ratio = 1.0 / 3.0; + let selection_ratios = [0.000_01, 0.001, 0.1, 0.3]; - // Generate two random RowSelections with approximately 1/3 of the rows selected. - let row_selection_a = - RowSelection::from_filters(&[generate_random_row_selection(total_rows, selection_ratio)]); - let row_selection_b = - RowSelection::from_filters(&[generate_random_row_selection(total_rows, selection_ratio)]); + for ratio in selection_ratios { + let slice_selection_a = + RowSelection::from_filters(&[generate_random_row_selection(total_rows, ratio)]); + let slice_selection_b = + RowSelection::from_filters(&[generate_random_row_selection(total_rows, ratio)]); - // Benchmark the intersection of the two RowSelections. - c.bench_function("intersection", |b| { - b.iter(|| { - let intersection = row_selection_a.intersection(&row_selection_b); - criterion::black_box(intersection); - }) - }); + let boolean_selection_a = BooleanRowSelection::from(slice_selection_a.clone()); + let boolean_selection_b = BooleanRowSelection::from(slice_selection_b.clone()); - c.bench_function("union", |b| { - b.iter(|| { - let union = row_selection_a.union(&row_selection_b); - criterion::black_box(union); - }) - }); + // Benchmark the intersection of the two RowSelections. + c.bench_function(&format!("slice intersection {}", ratio), |b| { + b.iter(|| { + let intersection = slice_selection_a.intersection(&slice_selection_b); + criterion::black_box(intersection); + }) + }); - c.bench_function("from_filters", |b| { - let boolean_array = generate_random_row_selection(total_rows, selection_ratio); - b.iter(|| { - let array = boolean_array.clone(); - let selection = RowSelection::from_filters(&[array]); - criterion::black_box(selection); - }) - }); + c.bench_function(&format!("boolean intersection {}", ratio), |b| { + b.iter(|| { + let intersection = boolean_selection_a.intersection(&boolean_selection_b); + criterion::black_box(intersection); + }) + }); - c.bench_function("and_then", |b| { - let selected = row_selection_a.row_count(); - let sub_selection = - RowSelection::from_filters(&[generate_random_row_selection(selected, selection_ratio)]); - b.iter(|| { - let result = row_selection_a.and_then(&sub_selection); - criterion::black_box(result); - }) - }); + c.bench_function(&format!("slice union {}", ratio), |b| { + b.iter(|| { + let union = slice_selection_a.union(&slice_selection_b); + criterion::black_box(union); + }) + }); + + c.bench_function(&format!("boolean union {}", ratio), |b| { + b.iter(|| { + let union = boolean_selection_a.union(&boolean_selection_b); + criterion::black_box(union); + }) + }); + + c.bench_function(&format!("slice from_filters {}", ratio), |b| { + let boolean_array = generate_random_row_selection(total_rows, ratio); + b.iter(|| { + let array = boolean_array.clone(); + let selection = RowSelection::from_filters(&[array]); + criterion::black_box(selection); + }) + }); + + c.bench_function(&format!("boolean from_filters {}", ratio), |b| { + let boolean_array = generate_random_row_selection(total_rows, ratio); + b.iter(|| { + let array = boolean_array.clone(); + let selection = BooleanRowSelection::from_filters(&[array]); + criterion::black_box(selection); + }) + }); + + c.bench_function(&format!("slice and_then {}", ratio), |b| { + let selected = slice_selection_a.row_count(); + let sub_selection = + RowSelection::from_filters(&[generate_random_row_selection(selected, ratio)]); + b.iter(|| { + let result = slice_selection_a.and_then(&sub_selection); + criterion::black_box(result); + }) + }); + + c.bench_function(&format!("boolean and_then {}", ratio), |b| { + let selected = boolean_selection_a.row_count(); + let sub_selection = + BooleanRowSelection::from_filters(&[generate_random_row_selection( + selected, ratio, + )]); + b.iter(|| { + let result = boolean_selection_a.and_then(&sub_selection); + criterion::black_box(result); + }) + }); + } } criterion_group!(benches, criterion_benchmark); diff --git a/parquet/src/arrow/arrow_reader/boolean_selection.rs b/parquet/src/arrow/arrow_reader/boolean_selection.rs index 59cda6c1aeb9..e76df2c29fc3 100644 --- a/parquet/src/arrow/arrow_reader/boolean_selection.rs +++ b/parquet/src/arrow/arrow_reader/boolean_selection.rs @@ -152,9 +152,9 @@ impl BooleanRowSelection { return self.intersection(other); } - let mut buffer = MutableBuffer::from_len_zeroed(self.len()); - buffer.copy_from_slice(self.selector.values()); - let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, self.len()); + let mut buffer = MutableBuffer::from_len_zeroed(self.selector.inner().len()); + buffer.copy_from_slice(self.selector.inner().as_slice()); + let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, self.selector.len()); // Create iterators for 'self' and 'other' bits let mut other_bits = other.selector.iter(); diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1b28126d20c6..dc8ac936866e 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -20,14 +20,6 @@ use std::collections::VecDeque; use std::sync::Arc; -use arrow_array::cast::AsArray; -use arrow_array::Array; -use arrow_array::{RecordBatch, RecordBatchReader}; -use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; -use arrow_select::filter::prep_null_mask_filter; -pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; -pub use selection::{RowSelection, RowSelector}; - pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{build_array_reader, ArrayReader}; use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; @@ -37,6 +29,13 @@ use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; +use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader}; +use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; +use arrow_select::filter::prep_null_mask_filter; +#[cfg(feature = "experimental")] +pub use boolean_selection::BooleanRowSelection; +pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; +pub use selection::{RowSelection, RowSelector}; #[cfg(feature = "experimental")] mod boolean_selection; From ae84b56d3dda4417702653f589f56ddadee45288 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 25 Oct 2024 10:11:18 -0500 Subject: [PATCH 3/4] update doc --- parquet/src/arrow/arrow_reader/boolean_selection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/boolean_selection.rs b/parquet/src/arrow/arrow_reader/boolean_selection.rs index e76df2c29fc3..712db9c7d753 100644 --- a/parquet/src/arrow/arrow_reader/boolean_selection.rs +++ b/parquet/src/arrow/arrow_reader/boolean_selection.rs @@ -134,7 +134,7 @@ impl BooleanRowSelection { /// Combines this [`BooleanRowSelection`] with another using logical AND on the selected bits. /// - /// Unlike [`intersection`], the `other` [`BooleanRowSelection`] must have exactly as many set bits as `self`. + /// Unlike intersection, the `other` [`BooleanRowSelection`] must have exactly as many set bits as `self`. /// This method will keep only the bits in `self` that are also set in `other` /// at the positions corresponding to `self`'s set bits. pub fn and_then(&self, other: &Self) -> Self { From fea1120673539c587cef48994e0bb976168d6030 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 25 Oct 2024 10:18:18 -0500 Subject: [PATCH 4/4] make clippy happy --- parquet/src/arrow/arrow_reader/boolean_selection.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/boolean_selection.rs b/parquet/src/arrow/arrow_reader/boolean_selection.rs index 712db9c7d753..34457e463e4f 100644 --- a/parquet/src/arrow/arrow_reader/boolean_selection.rs +++ b/parquet/src/arrow/arrow_reader/boolean_selection.rs @@ -60,11 +60,6 @@ impl BooleanRowSelection { BooleanRowSelection { selector: buffer } } - /// Returns the number of rows in this [`BooleanRowSelection`]. - pub fn len(&self) -> usize { - self.selector.len() - } - /// Returns the number of rows selected by this [`BooleanRowSelection`]. pub fn row_count(&self) -> usize { self.selector.count_set_bits() @@ -141,14 +136,14 @@ impl BooleanRowSelection { // Ensure that 'other' has exactly as many set bits as 'self' debug_assert_eq!( self.row_count(), - other.len(), + other.selector.len(), "The 'other' selection must have exactly as many set bits as 'self'." ); - if self.len() == other.len() { + if self.selector.len() == other.selector.len() { // fast path if the two selections are the same length // common if this is the first predicate - debug_assert_eq!(self.row_count(), self.len()); + debug_assert_eq!(self.row_count(), self.selector.len()); return self.intersection(other); }