Skip to content

Commit

Permalink
Merge branch 'master' into feature/add-pandas1-dtypes
Browse files Browse the repository at this point in the history
  • Loading branch information
ynqa committed Dec 13, 2020
2 parents d0c4855 + ad60a60 commit f2e714d
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 13 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include LICENSE
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ It prepares like pandas APIs:
- Read the records from Avro file and fit them into pandas DataFrame using [fastavro](https://github.com/tebeka/fastavro).
- `to_avro`
- Write the rows of pandas DataFrame to Avro file with the original schema infer.

## What can and can't pandavro do?

Avro can represent the following kinds of types:
Expand Down Expand Up @@ -57,6 +57,35 @@ Pandavro also supports these logical types:
| Numpy/pandas type | Avro logical type |
|-------------------------------------------------|--------------------|
| np.datetime64, pd.DatetimeTZDtype, pd.Timestamp | timestamp-micros* |
If a boolean column includes empty values, pandas classifies the column as having a dtype of `object` - this is accounted for in complex column handling.


And these complex types - all complex types other than 'fixed' will be classified by pandas as having a dtype of `object`, so their underlying python types are used to determine the Avro type:

| Numpy/Python type | Avro complex type |
|-------------------------------|-------------------|
| dict, collections.OrderedDict | record |
| list | array |
| np.void | fixed |

Record and array types can be arbitrarily nested within each other.

The schema definition of a record requires a unique name for the record separate from the column itself. This does not map to any concept in pandas, so for this we just append '_record' to the original column name and a number to ensure that there are zero duplicate 'name' values.

The remaining Avro complex types are not currently supported for the following reasons:
1. Enum: The closest pandas type to Avro's enum type is `pd.Categorical`, but it still is not a complete match. Possible values of the enum type can only be alphanumeric strings, whereas `pd.Categorical` values have no such limitation.
2. Map: No strictly matching concept in Python/pandas - Python dictionaries can have arbitrarily typed keys. Functionality can be essentially be achieved with the record type.
3. Union: Any column with mixed types (other than empty values/`NoneType`) are treated by pandas as having a dtype of `object`, and will be written as strings. It would be difficult to deterministically infer multiple allowed data types based solely on a column's contents.


And these logical types:

| Numpy/pandas type | Avro logical type |
|-------------------------------------------------|-----------------------------------|
| np.datetime64, pd.DatetimeTZDtype, pd.Timestamp | timestamp-micros/timezone-millis |

Note that the timestamp must not contain any timezone (it must be naive) because Avro does not support timezones.
Timestamps are encoded as microseconds by default, but can be encoded in milliseconds by using `times_as_micros=False`

\* If passed `to_avro(..., times_as_micros=False)`, this has a millisecond resolution.

Expand Down
97 changes: 88 additions & 9 deletions pandavro/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from collections import OrderedDict

import fastavro
import numpy as np
Expand Down Expand Up @@ -92,15 +93,96 @@ def __type_infer(t):
raise TypeError('Invalid type: {}'.format(t))


def __fields_infer(df):
return [
def __complex_field_infer(df, field, nested_record_names):
NoneType = type(None)
bool_types = {bool, NoneType}
string_types = {str, NoneType}
record_types = {dict, OrderedDict, NoneType}
array_types = {list, NoneType}

base_field_types = set(df[field].apply(type))

# String type - have to check for string first, in case a column contains
# entirely 'None's
if base_field_types.issubset(string_types):
return 'string'
# Bool type - if a boolean field contains missing values, pandas will give
# its type as np.dtype('O'), so we have to double check for it here.
if base_field_types.issubset(bool_types):
return 'boolean'
# Record type
elif base_field_types.issubset(record_types):
records = df.loc[~df[field].isna(), field].reset_index(drop=True)

if field in nested_record_names:
nested_record_names[field] += 1
else:
nested_record_names[field] = 0
return {
'type': 'record',
'name': field + '_record' + str(nested_record_names[field]),
'fields': __fields_infer(pd.DataFrame.from_records(records),
nested_record_names)
}
# Array type
elif base_field_types.issubset(array_types):
arrays = pd.Series(df.loc[~df[field].isna(), field].sum(),
name=field).reset_index(drop=True)
if arrays.empty:
print('Array field \'{}\' has been provided containing only empty '
'lists. The intended type of its contents cannot be '
'inferred, so \'string\' was assumed.'.format(field))
items = 'string'
else:
items = __fields_infer(arrays.to_frame(),
nested_record_names)[0]['type']
return {
'type': 'array',
'items': items
}


def __fields_infer(df, nested_record_names):
inferred_fields = [
{'name': key, 'type': __type_infer(type_np)}
for key, type_np in six.iteritems(df.dtypes)
]
for field in inferred_fields:
if 'complex' in field['type']:
field['type'] = [
'null',
__complex_field_infer(df, field['name'], nested_record_names)
]
return inferred_fields


def __convert_field_micros_to_millis(field):
if isinstance(field, list):
for i in range(0, len(field)):
field[i] = __convert_field_micros_to_millis(field[i])
return field
elif isinstance(field, dict):
for key, item in field.items():
field[key] = __convert_field_micros_to_millis(item)
return field
elif isinstance(field, str):
if field == 'timestamp-micros':
return 'timestamp-millis'
else:
return field


def schema_infer(df, times_as_micros=True):
"""
Infers the Avro schema of a pandas DataFrame
def __schema_infer(df, times_as_micros):
fields = __fields_infer(df)
Args:
df: DataFrame to infer the schema of
times_as_micros:
Whether timestamps should be stored as microseconds (default)
or milliseconds (as expected by Apache Hive)
"""
fields = __fields_infer(df, {})
schema = {
'type': 'record',
'name': 'Root',
Expand All @@ -110,10 +192,7 @@ def __schema_infer(df, times_as_micros):
# Patch 'timestamp-millis' in
if not times_as_micros:
for field in schema['fields']:
non_null_type = field['type'][1]
if isinstance(non_null_type, dict):
if non_null_type.get('logicalType') == 'timestamp-micros':
non_null_type['logicalType'] = 'timestamp-millis'
field = __convert_field_micros_to_millis(field)
return schema


Expand Down Expand Up @@ -226,7 +305,7 @@ def to_avro(file_path_or_buffer, df, schema=None, append=False,
"""
if schema is None:
schema = __schema_infer(df, times_as_micros)
schema = schema_infer(df, times_as_micros)

open_mode = 'wb' if not append else 'a+b'

Expand Down
43 changes: 40 additions & 3 deletions tests/pandavro_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_schema_infer(dataframe):
{'type': ['null', 'string'], 'name': 'String'},
]
}
assert expect == pdx.__schema_infer(dataframe, times_as_micros=True)
assert expect == pdx.schema_infer(dataframe, times_as_micros=True)


def test_schema_infer_times_as_millis(dataframe):
Expand All @@ -51,7 +51,44 @@ def test_schema_infer_times_as_millis(dataframe):
{'type': ['null', 'string'], 'name': 'String'},
]
}
assert expect == pdx.__schema_infer(dataframe, times_as_micros=False)
assert expect == pdx.schema_infer(dataframe, times_as_micros=False)


def test_schema_infer_complex_types(dataframe):
expect = {
'type': 'record',
'name': 'Root',
'fields':
[
{'type': ['null', 'boolean'], 'name': 'Boolean'},
{'type': ['null', {'logicalType': 'timestamp-micros', 'type': 'long'}],
'name': 'DateTime64'},
{'type': ['null', 'double'], 'name': 'Float64'},
{'type': ['null', 'long'], 'name': 'Int64'},
{'type': ['null', 'string'], 'name': 'String'},
{'type': ['null', {
'fields':
[
{'name': 'field1', 'type': ['null', 'long']},
{'name': 'field2', 'type': ['null', 'string']}
],
'name': 'Record_record0',
'type': 'record'}],
'name': 'Record'},
{'type': ['null', {'type': 'array', 'items': ['null', 'long']}],
'name': 'Array'}
]
}
dataframe["Record"] = [
{'field1': 1, 'field2': 'str1'}, {'field1': 2, 'field2': 'str2'},
{'field1': 3, 'field2': 'str3'}, {'field1': 4, 'field2': 'str4'},
{'field1': 5, 'field2': 'str5'}, {'field1': 6, 'field2': 'str6'},
{'field1': 7, 'field2': 'str7'}, {'field1': 8, 'field2': 'str8'}]
dataframe["Array"] = [
[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12], [13, 14], [15, 16]
]

assert expect == pdx.schema_infer(dataframe, times_as_micros=True)


def test_fields_infer(dataframe):
Expand All @@ -63,7 +100,7 @@ def test_fields_infer(dataframe):
{'type': ['null', 'long'], 'name': 'Int64'},
{'type': ['null', 'string'], 'name': 'String'},
]
assert expect == pdx.__fields_infer(dataframe)
assert expect == pdx.__fields_infer(dataframe, nested_record_names={})


def test_buffer_e2e(dataframe):
Expand Down

0 comments on commit f2e714d

Please sign in to comment.