From c597e4a126ef165a61c0e6092822b0ce0b97f4f4 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 25 Oct 2024 10:03:48 -0500 Subject: [PATCH] add boolean selection --- parquet/Cargo.toml | 101 ++++++++++++---- parquet/benches/row_selector.rs | 113 ++++++++++++------ .../arrow/arrow_reader/boolean_selection.rs | 6 +- parquet/src/arrow/arrow_reader/mod.rs | 15 ++- 4 files changed, 167 insertions(+), 68 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 32bc13b62a53..069c154eb2ef 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -29,10 +29,14 @@ edition = { workspace = true } rust-version = "1.70.0" [target.'cfg(target_arch = "wasm32")'.dependencies] -ahash = { version = "0.8", default-features = false, features = ["compile-time-rng"] } +ahash = { version = "0.8", default-features = false, features = [ + "compile-time-rng", +] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } +ahash = { version = "0.8", default-features = false, features = [ + "runtime-rng", +] } [dependencies] arrow-array = { workspace = true, optional = true } @@ -49,25 +53,53 @@ object_store = { version = "0.11.0", default-features = false, optional = true } bytes = { version = "1.1", default-features = false, features = ["std"] } thrift = { version = "0.17", default-features = false } snap = { version = "1.0", default-features = false, optional = true } -brotli = { version = "7.0", default-features = false, features = ["std"], optional = true } -flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true } -lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true } +brotli = { version = "7.0", default-features = false, features = [ + "std", +], optional = true } +flate2 = { version = "1.0", default-features = false, features = [ + "rust_backend", +], optional = true } +lz4_flex = { version = "0.11", default-features = false, features = [ + "std", + "frame", +], optional = true } zstd = { version = "0.13", optional = true, default-features = false } chrono = { workspace = true } num = { version = "0.4", default-features = false } num-bigint = { version = "0.4", default-features = false } -base64 = { version = "0.22", default-features = false, features = ["std", ], optional = true } -clap = { version = "4.1", default-features = false, features = ["std", "derive", "env", "help", "error-context", "usage"], optional = true } -serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } -serde_json = { version = "1.0", default-features = false, features = ["std"], optional = true } +base64 = { version = "0.22", default-features = false, features = [ + "std", +], optional = true } +clap = { version = "4.1", default-features = false, features = [ + "std", + "derive", + "env", + "help", + "error-context", + "usage", +], optional = true } +serde = { version = "1.0", default-features = false, features = [ + "derive", +], optional = true } +serde_json = { version = "1.0", default-features = false, features = [ + "std", +], optional = true } seq-macro = { version = "0.3", default-features = false } -futures = { version = "0.3", default-features = false, features = ["std"], optional = true } -tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] } +futures = { version = "0.3", default-features = false, features = [ + "std", +], optional = true } +tokio = { version = "1.0", optional = true, default-features = false, features = [ + "macros", + "rt", + "io-util", +] } hashbrown = { version = "0.14", default-features = false } twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } -sysinfo = { version = "0.32.0", optional = true, default-features = false, features = ["system"] } +sysinfo = { version = "0.32.0", optional = true, default-features = false, features = [ + "system", +] } crc32fast = { version = "1.4.2", optional = true, default-features = false } [dev-dependencies] @@ -76,14 +108,34 @@ criterion = { version = "0.5", default-features = false } snap = { version = "1.0", default-features = false } tempfile = { version = "3.0", default-features = false } brotli = { version = "7.0", default-features = false, features = ["std"] } -flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] } -lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"] } +flate2 = { version = "1.0", default-features = false, features = [ + "rust_backend", +] } +lz4_flex = { version = "0.11", default-features = false, features = [ + "std", + "frame", +] } zstd = { version = "0.13", default-features = false } serde_json = { version = "1.0", features = ["std"], default-features = false } -arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] } -tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "io-util", "fs"] } -rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } -object_store = { version = "0.11.0", default-features = false, features = ["azure"] } +arrow = { workspace = true, features = [ + "ipc", + "test_utils", + "prettyprint", + "json", +] } +tokio = { version = "1.0", default-features = false, features = [ + "macros", + "rt", + "io-util", + "fs", +] } +rand = { version = "0.8", default-features = false, features = [ + "std", + "std_rng", +] } +object_store = { version = "0.11.0", default-features = false, features = [ + "azure", +] } # TODO: temporary to fix parquet wasm build # upstream issue: https://github.com/gyscos/zstd-rs/issues/269 @@ -101,7 +153,16 @@ default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] # Enable lz4 lz4 = ["lz4_flex"] # Enable arrow reader/writer APIs -arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"] +arrow = [ + "base64", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "arrow-select", + "arrow-ipc", +] # Enable CLI tools cli = ["json", "base64", "clap", "arrow-csv", "serde"] # Enable JSON APIs @@ -225,7 +286,7 @@ harness = false [[bench]] name = "row_selector" harness = false -required-features = ["arrow"] +required-features = ["arrow", "experimental"] [lib] bench = false diff --git a/parquet/benches/row_selector.rs b/parquet/benches/row_selector.rs index 32f0d6a56064..783a4644f549 100644 --- a/parquet/benches/row_selector.rs +++ b/parquet/benches/row_selector.rs @@ -17,7 +17,7 @@ use arrow_array::BooleanArray; use criterion::*; -use parquet::arrow::arrow_reader::RowSelection; +use parquet::arrow::arrow_reader::{BooleanRowSelection, RowSelection}; use rand::Rng; /// Generates a random RowSelection with a specified selection ratio. @@ -40,47 +40,86 @@ fn generate_random_row_selection(total_rows: usize, selection_ratio: f64) -> Boo fn criterion_benchmark(c: &mut Criterion) { let total_rows = 300_000; - let selection_ratio = 1.0 / 3.0; + let selection_ratios = [0.000_01, 0.001, 0.1, 0.3]; - // Generate two random RowSelections with approximately 1/3 of the rows selected. - let row_selection_a = - RowSelection::from_filters(&[generate_random_row_selection(total_rows, selection_ratio)]); - let row_selection_b = - RowSelection::from_filters(&[generate_random_row_selection(total_rows, selection_ratio)]); + for ratio in selection_ratios { + let slice_selection_a = + RowSelection::from_filters(&[generate_random_row_selection(total_rows, ratio)]); + let slice_selection_b = + RowSelection::from_filters(&[generate_random_row_selection(total_rows, ratio)]); - // Benchmark the intersection of the two RowSelections. - c.bench_function("intersection", |b| { - b.iter(|| { - let intersection = row_selection_a.intersection(&row_selection_b); - criterion::black_box(intersection); - }) - }); + let boolean_selection_a = BooleanRowSelection::from(slice_selection_a.clone()); + let boolean_selection_b = BooleanRowSelection::from(slice_selection_b.clone()); - c.bench_function("union", |b| { - b.iter(|| { - let union = row_selection_a.union(&row_selection_b); - criterion::black_box(union); - }) - }); + // Benchmark the intersection of the two RowSelections. + c.bench_function(&format!("slice intersection {}", ratio), |b| { + b.iter(|| { + let intersection = slice_selection_a.intersection(&slice_selection_b); + criterion::black_box(intersection); + }) + }); - c.bench_function("from_filters", |b| { - let boolean_array = generate_random_row_selection(total_rows, selection_ratio); - b.iter(|| { - let array = boolean_array.clone(); - let selection = RowSelection::from_filters(&[array]); - criterion::black_box(selection); - }) - }); + c.bench_function(&format!("boolean intersection {}", ratio), |b| { + b.iter(|| { + let intersection = boolean_selection_a.intersection(&boolean_selection_b); + criterion::black_box(intersection); + }) + }); - c.bench_function("and_then", |b| { - let selected = row_selection_a.row_count(); - let sub_selection = - RowSelection::from_filters(&[generate_random_row_selection(selected, selection_ratio)]); - b.iter(|| { - let result = row_selection_a.and_then(&sub_selection); - criterion::black_box(result); - }) - }); + c.bench_function(&format!("slice union {}", ratio), |b| { + b.iter(|| { + let union = slice_selection_a.union(&slice_selection_b); + criterion::black_box(union); + }) + }); + + c.bench_function(&format!("boolean union {}", ratio), |b| { + b.iter(|| { + let union = boolean_selection_a.union(&boolean_selection_b); + criterion::black_box(union); + }) + }); + + c.bench_function(&format!("slice from_filters {}", ratio), |b| { + let boolean_array = generate_random_row_selection(total_rows, ratio); + b.iter(|| { + let array = boolean_array.clone(); + let selection = RowSelection::from_filters(&[array]); + criterion::black_box(selection); + }) + }); + + c.bench_function(&format!("boolean from_filters {}", ratio), |b| { + let boolean_array = generate_random_row_selection(total_rows, ratio); + b.iter(|| { + let array = boolean_array.clone(); + let selection = BooleanRowSelection::from_filters(&[array]); + criterion::black_box(selection); + }) + }); + + c.bench_function(&format!("slice and_then {}", ratio), |b| { + let selected = slice_selection_a.row_count(); + let sub_selection = + RowSelection::from_filters(&[generate_random_row_selection(selected, ratio)]); + b.iter(|| { + let result = slice_selection_a.and_then(&sub_selection); + criterion::black_box(result); + }) + }); + + c.bench_function(&format!("boolean and_then {}", ratio), |b| { + let selected = boolean_selection_a.row_count(); + let sub_selection = + BooleanRowSelection::from_filters(&[generate_random_row_selection( + selected, ratio, + )]); + b.iter(|| { + let result = boolean_selection_a.and_then(&sub_selection); + criterion::black_box(result); + }) + }); + } } criterion_group!(benches, criterion_benchmark); diff --git a/parquet/src/arrow/arrow_reader/boolean_selection.rs b/parquet/src/arrow/arrow_reader/boolean_selection.rs index 59cda6c1aeb9..e76df2c29fc3 100644 --- a/parquet/src/arrow/arrow_reader/boolean_selection.rs +++ b/parquet/src/arrow/arrow_reader/boolean_selection.rs @@ -152,9 +152,9 @@ impl BooleanRowSelection { return self.intersection(other); } - let mut buffer = MutableBuffer::from_len_zeroed(self.len()); - buffer.copy_from_slice(self.selector.values()); - let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, self.len()); + let mut buffer = MutableBuffer::from_len_zeroed(self.selector.inner().len()); + buffer.copy_from_slice(self.selector.inner().as_slice()); + let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, self.selector.len()); // Create iterators for 'self' and 'other' bits let mut other_bits = other.selector.iter(); diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1b28126d20c6..dc8ac936866e 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -20,14 +20,6 @@ use std::collections::VecDeque; use std::sync::Arc; -use arrow_array::cast::AsArray; -use arrow_array::Array; -use arrow_array::{RecordBatch, RecordBatchReader}; -use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; -use arrow_select::filter::prep_null_mask_filter; -pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; -pub use selection::{RowSelection, RowSelector}; - pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{build_array_reader, ArrayReader}; use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; @@ -37,6 +29,13 @@ use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; +use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader}; +use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; +use arrow_select::filter::prep_null_mask_filter; +#[cfg(feature = "experimental")] +pub use boolean_selection::BooleanRowSelection; +pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; +pub use selection::{RowSelection, RowSelector}; #[cfg(feature = "experimental")] mod boolean_selection;