Skip to content

Commit

Permalink
Revert "Speedup FrametoArray serializer for ChunkStore by removing in…
Browse files Browse the repository at this point in the history
…termediate DataFrame construction. (man-group#909)"

This reverts commit 160d261.
  • Loading branch information
BaiBaiHi committed Jan 7, 2022
1 parent ddb31d0 commit 8118025
Showing 1 changed file with 23 additions and 39 deletions.
62 changes: 23 additions & 39 deletions arctic/serialization/numpy_arrays.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from .._compression import compress, decompress, compress_array
from ._serializer import Serializer
from collections import defaultdict

try:
from pandas.api.types import infer_dtype
except ImportError:
Expand Down Expand Up @@ -139,35 +139,31 @@ def docify(self, df):

return doc

def objify(self, doc, columns=None, as_df=True):
def objify(self, doc, columns=None):
"""
Decode a Pymongo SON object into an Pandas DataFrame
"""
cols = columns or doc[METADATA][COLUMNS]
data = {}
valid_columns = doc[METADATA][LENGTHS]
missing_columns = set(cols).difference(valid_columns)
for col in valid_columns:
d = decompress(doc[DATA][doc[METADATA][LENGTHS][col][0]: doc[METADATA][LENGTHS][col][1] + 1])
# d is ready-only but that's not an issue since DataFrame will copy the data anyway.
d = np.frombuffer(d, doc[METADATA][DTYPE][col])

if MASK in doc[METADATA] and col in doc[METADATA][MASK]:
mask_data = decompress(doc[METADATA][MASK][col])
mask = np.frombuffer(mask_data, 'bool')
d = ma.masked_array(d, mask)
data[col] = d

for col in missing_columns:
# if there is missing data in a chunk, we can default to NaN and
empty = np.empty(len(d))
empty[:] = np.nan
data[col] = empty

if as_df:
return pd.DataFrame(data)
for col in cols:
# if there is missing data in a chunk, we can default to NaN
# and pandas will autofill the missing values to the correct length
if col not in doc[METADATA][LENGTHS]:
d = np.array(np.nan)
else:
d = decompress(doc[DATA][doc[METADATA][LENGTHS][col][0]: doc[METADATA][LENGTHS][col][1] + 1])
# d is ready-only but that's not an issue since DataFrame will copy the data anyway.
d = np.frombuffer(d, doc[METADATA][DTYPE][col])

if MASK in doc[METADATA] and col in doc[METADATA][MASK]:
mask_data = decompress(doc[METADATA][MASK][col])
mask = np.frombuffer(mask_data, 'bool')
d = ma.masked_array(d, mask)
data[col] = d

return data
# Copy into
return pd.DataFrame(data, columns=cols, copy=True)[cols]


class FrametoArraySerializer(Serializer):
Expand Down Expand Up @@ -226,24 +222,12 @@ def deserialize(self, data, columns=None):
raise Exception("Duplicate columns specified, cannot de-serialize")

if not isinstance(data, list):
data = [data]

df = defaultdict(list)

for d in data:
for k, v in self.converter.objify(d, columns, as_df=False).items():
df[k].append(v)

idx_cols = meta[INDEX] if index else []
if len(idx_cols) > 1:
idx = pd.MultiIndex.from_arrays([np.concatenate(df[k]) for k in idx_cols], names=idx_cols)
elif len(idx_cols) == 1:
idx = pd.Index(np.concatenate(df[idx_cols[0]]), name=idx_cols[0])
df = self.converter.objify(data, columns)
else:
idx = None

df = pd.DataFrame({k: np.concatenate(v) for k, v in df.items() if k not in idx_cols}, index=idx)
df = pd.concat([self.converter.objify(d, columns) for d in data], ignore_index=not index)

if index:
df = df.set_index(meta[INDEX])
if meta[TYPE] == 'series':
return df[df.columns[0]]
return df
Expand Down

0 comments on commit 8118025

Please sign in to comment.