Skip to content

Commit

Permalink
feat(rust): add BinaryView to parquet writer/reader. (pola-rs#13489)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jan 10, 2024
1 parent e52ef5d commit 505ec71
Show file tree
Hide file tree
Showing 45 changed files with 1,629 additions and 1,086 deletions.
4 changes: 3 additions & 1 deletion crates/polars-arrow/src/array/binview/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ unsafe impl<T: ViewType + ?Sized> ToFfi for BinaryViewArrayGeneric<T> {
buffers: self.buffers.clone(),
raw_buffers: self.raw_buffers.clone(),
phantom: Default::default(),
total_bytes_len: self.total_bytes_len,
total_buffer_len: self.total_buffer_len,
}
}
}
Expand All @@ -67,7 +69,7 @@ impl<T: ViewType + ?Sized, A: ffi::ArrowArrayRef> FromFfi<A> for BinaryViewArray
buffers.push(values);
}

Ok(Self::new_unchecked(
Ok(Self::new_unchecked_unknown_md(
data_type,
views,
Arc::from(buffers),
Expand Down
21 changes: 10 additions & 11 deletions crates/polars-arrow/src/array/binview/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fmt::{Debug, Formatter, Result, Write};
use super::super::fmt::write_vec;
use super::BinaryViewArrayGeneric;
use crate::array::binview::ViewType;
use crate::array::Array;
use crate::array::{Array, BinaryViewArray, Utf8ViewArray};

pub fn write_value<'a, T: ViewType + ?Sized, W: Write>(
array: &'a BinaryViewArrayGeneric<T>,
Expand All @@ -19,19 +19,18 @@ where
write_vec(f, writer, None, bytes.len(), "None", false)
}

impl<T: ViewType + ?Sized> Debug for BinaryViewArrayGeneric<T>
where
for<'a> &'a T: Debug,
{
impl Debug for BinaryViewArray {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
let writer = |f: &mut Formatter, index| write_value(self, index, f);
write!(f, "BinaryViewArray")?;
write_vec(f, writer, self.validity(), self.len(), "None", false)
}
}

let head = if T::IS_UTF8 {
"Utf8ViewArray"
} else {
"BinaryViewArray"
};
write!(f, "{head}")?;
impl Debug for Utf8ViewArray {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
let writer = |f: &mut Formatter, index| write!(f, "{}", self.value(index));
write!(f, "Utf8ViewArray")?;
write_vec(f, writer, self.validity(), self.len(), "None", false)
}
}
136 changes: 116 additions & 20 deletions crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ mod private {
impl Sealed for str {}
impl Sealed for [u8] {}
}
pub use mutable::MutableBinaryViewArray;
use private::Sealed;

use crate::array::binview::iterator::BinaryViewValueIter;
use crate::array::binview::mutable::MutableBinaryViewArray;
use crate::array::binview::view::{validate_binary_view, validate_utf8_view};
use crate::array::binview::view::{
validate_binary_view, validate_utf8_only_view, validate_utf8_view,
};
use crate::array::iterator::NonNullValuesIter;
use crate::bitmap::utils::{BitmapIter, ZipValidity};

pub type BinaryViewArray = BinaryViewArrayGeneric<[u8]>;
pub type Utf8ViewArray = BinaryViewArrayGeneric<str>;

pub trait ViewType: Sealed + 'static + PartialEq {
pub trait ViewType: Sealed + 'static + PartialEq + AsRef<Self> {
const IS_UTF8: bool;
const DATA_TYPE: ArrowDataType;
type Owned: Debug + Clone + Sync + Send + AsRef<Self>;
Expand Down Expand Up @@ -97,6 +99,10 @@ pub struct BinaryViewArrayGeneric<T: ViewType + ?Sized> {
raw_buffers: Arc<[(*const u8, usize)]>,
validity: Option<Bitmap>,
phantom: PhantomData<T>,
/// Total bytes length if we would concatenate them all.
total_bytes_len: usize,
/// Total bytes in the buffer (excluding remaining capacity)
total_buffer_len: usize,
}

impl<T: ViewType + ?Sized> Clone for BinaryViewArrayGeneric<T> {
Expand All @@ -108,6 +114,8 @@ impl<T: ViewType + ?Sized> Clone for BinaryViewArrayGeneric<T> {
raw_buffers: self.raw_buffers.clone(),
validity: self.validity.clone(),
phantom: Default::default(),
total_bytes_len: self.total_bytes_len,
total_buffer_len: self.total_buffer_len,
}
}
}
Expand All @@ -132,6 +140,8 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
views: Buffer<u128>,
buffers: Arc<[Buffer<u8>]>,
validity: Option<Bitmap>,
total_bytes_len: usize,
total_buffer_len: usize,
) -> Self {
let raw_buffers = buffers_into_raw(&buffers);
Self {
Expand All @@ -141,11 +151,34 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
raw_buffers,
validity,
phantom: Default::default(),
total_bytes_len,
total_buffer_len,
}
}

pub fn data_buffers(&self) -> &[Buffer<u8>] {
self.buffers.as_ref()
/// Create a new BinaryViewArray but initialize a statistics compute.
/// # Safety
/// The caller must ensure the invariants
pub unsafe fn new_unchecked_unknown_md(
data_type: ArrowDataType,
views: Buffer<u128>,
buffers: Arc<[Buffer<u8>]>,
validity: Option<Bitmap>,
) -> Self {
let total_bytes_len = views.iter().map(|v| (*v as u32) as usize).sum();
let total_buffer_len = buffers.iter().map(|b| b.len()).sum();
Self::new_unchecked(
data_type,
views,
buffers,
validity,
total_bytes_len,
total_buffer_len,
)
}

pub fn data_buffers(&self) -> &Arc<[Buffer<u8>]> {
&self.buffers
}

pub fn variadic_buffer_lengths(&self) -> Vec<i64> {
Expand All @@ -172,28 +205,33 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
polars_ensure!(validity.len()== views.len(), ComputeError: "validity mask length must match the number of values" )
}

let raw_buffers = buffers_into_raw(&buffers);
Ok(Self {
data_type,
views,
buffers,
raw_buffers,
validity,
phantom: Default::default(),
})
unsafe {
Ok(Self::new_unchecked_unknown_md(
data_type, views, buffers, validity,
))
}
}

/// Creates an empty [`BinaryViewArrayGeneric`], i.e. whose `.len` is zero.
#[inline]
pub fn new_empty(data_type: ArrowDataType) -> Self {
unsafe { Self::new_unchecked(data_type, Buffer::new(), Arc::from([]), None) }
unsafe { Self::new_unchecked(data_type, Buffer::new(), Arc::from([]), None, 0, 0) }
}

/// Returns a new null [`BinaryViewArrayGeneric`] of `length`.
#[inline]
pub fn new_null(data_type: ArrowDataType, length: usize) -> Self {
let validity = Some(Bitmap::new_zeroed(length));
unsafe { Self::new_unchecked(data_type, Buffer::zeroed(length), Arc::from([]), validity) }
unsafe {
Self::new_unchecked(
data_type,
Buffer::zeroed(length),
Arc::from([]),
validity,
0,
0,
)
}
}

/// Returns the element at index `i`
Expand Down Expand Up @@ -248,20 +286,79 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
}

/// Returns an iterator of the non-null values.
#[inline]
pub fn non_null_values_iter(&self) -> NonNullValuesIter<'_, BinaryViewArrayGeneric<T>> {
NonNullValuesIter::new(self, self.validity())
}

/// Returns an iterator of the non-null values.
pub fn non_null_views_iter(&self) -> NonNullValuesIter<'_, Buffer<u128>> {
NonNullValuesIter::new(self.views(), self.validity())
}

impl_sliced!();
impl_mut_validity!();
impl_into_array!();

pub fn from<S: AsRef<T>, P: AsRef<[Option<S>]>>(slice: P) -> Self {
let mutable =
MutableBinaryViewArray::from_iter(slice.as_ref().iter().map(|opt_v| opt_v.as_ref()));
let mutable = MutableBinaryViewArray::from_iterator(
slice.as_ref().iter().map(|opt_v| opt_v.as_ref()),
);
mutable.into()
}

/// Get the total length of bytes that it would take to concatenate all binary/str values in this array.
pub fn total_bytes_len(&self) -> usize {
self.total_bytes_len
}

/// Get the length of bytes that are stored in the variadic buffers.
pub fn total_buffer_len(&self) -> usize {
self.total_buffer_len
}
}

impl BinaryViewArray {
/// Validate the underlying bytes on UTF-8.
pub fn validate_utf8(&self) -> PolarsResult<()> {
validate_utf8_only_view(&self.views, &self.buffers)
}

/// Convert [`BinaryViewArray`] to [`Utf8ViewArray`].
pub fn to_utf8view(&self) -> PolarsResult<Utf8ViewArray> {
self.validate_utf8()?;
unsafe { Ok(self.to_utf8view_unchecked()) }
}

/// Convert [`BinaryViewArray`] to [`Utf8ViewArray`] without checking UTF-8.
///
/// # Safety
/// The caller must ensure the underlying data is valid UTF-8.
pub unsafe fn to_utf8view_unchecked(&self) -> Utf8ViewArray {
Utf8ViewArray::new_unchecked(
ArrowDataType::Utf8View,
self.views.clone(),
self.buffers.clone(),
self.validity.clone(),
self.total_bytes_len,
self.total_buffer_len,
)
}
}

impl Utf8ViewArray {
pub fn to_binview(&self) -> BinaryViewArray {
// SAFETY: same invariants.
unsafe {
BinaryViewArray::new_unchecked(
ArrowDataType::BinaryView,
self.views.clone(),
self.buffers.clone(),
self.validity.clone(),
self.total_bytes_len,
self.total_buffer_len,
)
}
}
}

impl<T: ViewType + ?Sized> Array for BinaryViewArrayGeneric<T> {
Expand Down Expand Up @@ -291,7 +388,6 @@ impl<T: ViewType + ?Sized> Array for BinaryViewArrayGeneric<T> {
"the offset of the new Buffer cannot exceed the existing length"
);
unsafe { self.slice_unchecked(offset, length) }
todo!()
}

unsafe fn slice_unchecked(&mut self, offset: usize, length: usize) {
Expand Down
Loading

0 comments on commit 505ec71

Please sign in to comment.