Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ability of FlightDataEncoder to respect max_flight_data_size for certain data types (strings, dictionaries, etc) #3478

Open
alamb opened this issue Jan 6, 2023 · 3 comments · May be fixed by #6690
Assignees
Labels
arrow-flight Changes to the arrow-flight crate enhancement Any new improvement worthy of a entry in the changelog

Comments

@alamb
Copy link
Contributor

alamb commented Jan 6, 2023

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Some implementations of gRPC, such as golang have a default max message size that is "relatively small" (4MB) and the clients will generate errors if they receive larger messages.

The FlightDataEncoder has a mechanism (link) to try and avoid this problem by heuristically slicing RecordBatchs into smaller parts to limit their size. This works well for primitive arrays but does not work well for other cases as we have found upstream in IOx:

  1. DataType::Utf8 (only the offsets are sliced, the underlying string data is not sliced)
  2. Dictionaries (the dictionary itself is not changed, so if the dictionary is large it will be repeated sent) -- will be an issue after Support DictionaryArrays in Arrow Flight #3389

Lists, structs, and other nested types probably suffer from similar issues with maximum message sizes.

Of course, the smallest message possible is a single row, which can always be be significantly larger than whatever the max_flight_data_size limit is for variable length columns (e.g. several large string columns)

Describe the solution you'd like
I would like to improve the situation and handle nested types and more effectively reduce the FlightDataSize

Describe alternatives you've considered

  1. One approach would be to copy the data into a new record batch, "packing" it into a brand new memory space (this is the approach I plan as a workaround in IOx) - this would result in the minimum sized flight data batches
  2. Another way might be to implement a slice / take that resulted in smaller data sizes (e.g. rewrite data offsets for strings)

Additional context
See #3347

@alamb alamb added arrow-flight Changes to the arrow-flight crate enhancement Any new improvement worthy of a entry in the changelog labels Jan 6, 2023
@alamb
Copy link
Contributor Author

alamb commented Jan 6, 2023

In reviewing the arrow IPC writer code, it does appear to be clever about using offsets when actually writing (thanks @viirya in #2040 ❤️ )

#[allow(clippy::too_many_arguments)]
fn write_array_data(
array_data: &ArrayData,
buffers: &mut Vec<crate::Buffer>,
arrow_data: &mut Vec<u8>,
nodes: &mut Vec<crate::FieldNode>,
offset: i64,
num_rows: usize,
null_count: usize,
compression_codec: Option<CompressionCodec>,
write_options: &IpcWriteOptions,
) -> Result<i64, ArrowError> {
let mut offset = offset;
if !matches!(array_data.data_type(), DataType::Null) {
nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
} else {
// NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
// where null_count is always 0.
nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
}
if has_validity_bitmap(array_data.data_type(), write_options) {
// write null buffer if exists
let null_buffer = match array_data.null_buffer() {
None => {
// create a buffer and fill it with valid bits
let num_bytes = bit_util::ceil(num_rows, 8);
let buffer = MutableBuffer::new(num_bytes);
let buffer = buffer.with_bitset(num_bytes, true);
buffer.into()
}
Some(buffer) => buffer.bit_slice(array_data.offset(), array_data.len()),
};
offset = write_buffer(
null_buffer.as_slice(),
buffers,
arrow_data,
offset,
compression_codec,
)?;
}
let data_type = array_data.data_type();
if matches!(
data_type,
DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8
) {
let offset_buffer = &array_data.buffers()[0];
let value_offset_byte_width = get_value_offset_byte_width(data_type);
let min_length = (array_data.len() + 1) * value_offset_byte_width;
if buffer_need_truncate(
array_data.offset(),
offset_buffer,
&BufferSpec::FixedWidth {
byte_width: value_offset_byte_width,
},
min_length,
) {
// Rebase offsets and truncate values
let (new_offsets, byte_offset) =
if matches!(data_type, DataType::Binary | DataType::Utf8) {
(
get_zero_based_value_offsets::<i32>(array_data),
get_buffer_offset::<i32>(array_data) as usize,
)
} else {
(
get_zero_based_value_offsets::<i64>(array_data),
get_buffer_offset::<i64>(array_data) as usize,
)
};
offset = write_buffer(
new_offsets.as_slice(),
buffers,
arrow_data,
offset,
compression_codec,
)?;
let total_bytes = get_binary_buffer_len(array_data);
let value_buffer = &array_data.buffers()[1];
let buffer_length = min(total_bytes, value_buffer.len() - byte_offset);
let buffer_slice =
&value_buffer.as_slice()[byte_offset..(byte_offset + buffer_length)];
offset = write_buffer(
buffer_slice,
buffers,
arrow_data,
offset,
compression_codec,
)?;
} else {
for buffer in array_data.buffers() {
offset = write_buffer(
buffer.as_slice(),
buffers,
arrow_data,
offset,
compression_codec,
)?;
}
}
} else if DataType::is_numeric(data_type)
|| DataType::is_temporal(data_type)
|| matches!(
array_data.data_type(),
DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
)
{
// Truncate values
assert!(array_data.buffers().len() == 1);
let buffer = &array_data.buffers()[0];
let layout = layout(data_type);
let spec = &layout.buffers[0];
let byte_width = get_buffer_element_width(spec);
let min_length = array_data.len() * byte_width;
if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
let byte_offset = array_data.offset() * byte_width;
let buffer_length = min(min_length, buffer.len() - byte_offset);
let buffer_slice =
&buffer.as_slice()[byte_offset..(byte_offset + buffer_length)];
offset = write_buffer(
buffer_slice,
buffers,
arrow_data,
offset,
compression_codec,
)?;
} else {
offset = write_buffer(
buffer.as_slice(),
buffers,
arrow_data,
offset,
compression_codec,
)?;
}
} else {
for buffer in array_data.buffers() {
offset =
write_buffer(buffer, buffers, arrow_data, offset, compression_codec)?;
}
}
if !matches!(array_data.data_type(), DataType::Dictionary(_, _)) {
// recursively write out nested structures
for data_ref in array_data.child_data() {
// write the nested data (e.g list data)
offset = write_array_data(
data_ref,
buffers,
arrow_data,
nodes,
offset,
data_ref.len(),
data_ref.null_count(),
compression_codec,
write_options,
)?;
}
}
Ok(offset)
}

However, I am not sure exactly how this will translate to flight data size -- I am writing some more tests now

@alamb
Copy link
Contributor Author

alamb commented Jan 6, 2023

PR with tests showing how far from optimal the current splitting logic is: #3481

@itsjunetime
Copy link
Contributor

take

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow-flight Changes to the arrow-flight crate enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants