Skip to content

Commit

Permalink
fix read_avro kwargs + cleanup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
marctorsoc committed Oct 15, 2022
1 parent 6d60fe3 commit 2dde965
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 20 deletions.
46 changes: 35 additions & 11 deletions pandavro/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import OrderedDict
from pathlib import Path
from typing import Optional, Iterable
from typing import Iterable, Optional

import fastavro
import numpy as np
Expand Down Expand Up @@ -184,15 +184,39 @@ def schema_infer(df, times_as_micros=True):
return schema


def __file_to_dataframe(f, schema, na_dtypes=False, columns: Optional[Iterable[str]] = None, **kwargs):
def __file_to_dataframe(
f,
schema,
na_dtypes=False,
columns: Optional[Iterable[str]] = None,
exclude: Optional[Iterable[str]] = None,
nrows: Optional[int] = None,
**kwargs,
):
reader = fastavro.reader(f, reader_schema=schema)
if columns is None:
records = list(reader)
# To free up some RAM we can select a subset of columns
else:
columns_set = frozenset(columns)
records = [{k: v for k, v in row.items() if k in columns_set} for row in reader]
columns_to_include = frozenset(columns) if columns else set()
columns_to_exclude = frozenset(exclude) if exclude else set()

records = []
for row_idx, row in enumerate(reader):

# stop if we reached nrows
if nrows and row_idx == nrows:
break

# include if columns_to_include not defined OR column in columns_to_include
# AND
# remove if columns_to_exclude not defined OR column in columns_to_exclude
records.append(
{
column: column_value
for column, column_value in row.items()
if len(columns_to_include) == 0 or column in columns_to_include
if len(columns_to_exclude) == 0 or column not in columns_to_exclude
}
)

# add columns again to indicate the order of the resulting dataframe
df = pd.DataFrame.from_records(records, columns=columns, **kwargs)

def _filter(typelist):
Expand Down Expand Up @@ -284,9 +308,9 @@ def __preprocess_dicts(l):
if v is pd.NA:
d[k] = None
# Convert some Pandas dtypes to normal Python dtypes
for key, value in PANDAS_TO_PYTHON_TYPES.items():
if isinstance(v, key):
d[k] = value(v)
for pandas_type, converter in PANDAS_TO_PYTHON_TYPES.items():
if isinstance(v, pandas_type):
d[k] = converter(v)
return l


Expand Down
28 changes: 19 additions & 9 deletions tests/pandavro_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import subprocess
import timeit
from datetime import timezone
from io import BytesIO
from pathlib import Path
from tempfile import NamedTemporaryFile
Expand All @@ -16,14 +16,18 @@
def dataframe():
strings = ['foo', 'bar', 'foo', 'bar', 'foo', 'bar', 'foo', 'bar']
return pd.DataFrame({"Boolean": [True, False, True, False, True, False, True, False],
"DateTime64": pd.date_range('20190101', '20190108', freq="1D"),
"DateTime64": pd.date_range('20190101', '20190108', freq="1D", tz=timezone.utc),
"Float64": np.random.randn(8),
"Int64": np.random.randint(0, 10, 8),
"String": strings,
"Bytes": [string.encode() for string in strings],
})


def process_datetime64_column(df):
df['DateTime64'] = df['DateTime64'].apply(lambda t: t.tz_convert(timezone.utc))


def test_schema_infer(dataframe):
expect = {
'type': 'record',
Expand Down Expand Up @@ -116,31 +120,31 @@ def test_buffer_e2e(dataframe):
pdx.to_avro(tf.name, dataframe)
with open(tf.name, 'rb') as f:
expect = pdx.read_avro(BytesIO(f.read()))
expect['DateTime64'] = expect['DateTime64'].apply(lambda t: t.tz_localize(None))
process_datetime64_column(expect)
assert_frame_equal(expect, dataframe)


def test_file_path_e2e(dataframe):
tf = NamedTemporaryFile()
pdx.to_avro(tf.name, dataframe)
expect = pdx.read_avro(tf.name)
expect['DateTime64'] = expect['DateTime64'].apply(lambda t: t.tz_localize(None))
process_datetime64_column(expect)
assert_frame_equal(expect, dataframe)


def test_pathlib_e2e(dataframe):
tf = NamedTemporaryFile()
pdx.to_avro(Path(tf.name), dataframe)
expect = pdx.read_avro(Path(tf.name))
expect['DateTime64'] = expect['DateTime64'].apply(lambda t: t.tz_localize(None))
process_datetime64_column(expect)
assert_frame_equal(expect, dataframe)


def test_delegation(dataframe):
tf = NamedTemporaryFile()
pdx.to_avro(tf.name, dataframe)
expect = pdx.from_avro(tf.name)
expect['DateTime64'] = expect['DateTime64'].apply(lambda t: t.tz_localize(None))
process_datetime64_column(expect)
assert_frame_equal(expect, dataframe)


Expand All @@ -149,7 +153,7 @@ def test_append(dataframe):
pdx.to_avro(tf.name, dataframe[0:int(dataframe.shape[0] / 2)])
pdx.to_avro(tf.name, dataframe[int(dataframe.shape[0] / 2):], append=True)
expect = pdx.from_avro(tf.name)
expect['DateTime64'] = expect['DateTime64'].apply(lambda t: t.tz_localize(None))
process_datetime64_column(expect)
assert_frame_equal(expect, dataframe)


Expand All @@ -168,15 +172,21 @@ def test_dataframe_kwargs(dataframe):
# exclude columns
columns = ['String', 'Boolean']
expect = pdx.read_avro(tf.name, exclude=columns)
expect['DateTime64'] = expect['DateTime64'].apply(lambda t: t.tz_localize(None))
process_datetime64_column(expect)
df = dataframe.drop(columns, axis=1)
assert_frame_equal(expect, df)
# specify index
index = 'String'
expect = pdx.read_avro(tf.name, index=index)
expect['DateTime64'] = expect['DateTime64'].apply(lambda t: t.tz_localize(None))
process_datetime64_column(expect)
df = dataframe.set_index(index)
assert_frame_equal(expect, df)
# specify nrows + exclude columns
columns = ['String', 'Boolean']
expect = pdx.read_avro(tf.name, exclude=columns, nrows=3)
process_datetime64_column(expect)
df = dataframe.drop(columns, axis=1).head(3)
assert_frame_equal(expect, df)


@pytest.fixture
Expand Down

0 comments on commit 2dde965

Please sign in to comment.