Skip to content

Commit

Permalink
Add as_dicts to Fugue API (#522)
Browse files Browse the repository at this point in the history
* Add  to Fugue API

* update dask

* update duckdb

* update the ibis and spark

* update ray

* update ray
  • Loading branch information
goodwanghan authored Oct 27, 2023
1 parent 7860865 commit 83f2b2b
Show file tree
Hide file tree
Showing 21 changed files with 678 additions and 132 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test_ray.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ concurrency:

jobs:
test_ray_lower_bound:
name: Ray 2.1.0
name: Ray 2.4.0
runs-on: ubuntu-latest

steps:
Expand All @@ -33,7 +33,7 @@ jobs:
- name: Install dependencies
run: make devenv
- name: Setup Ray
run: pip install ray[data]==2.1.0 pyarrow==6.0.1 pandas==1.5.3 'pydantic<2'
run: pip install ray[data]==2.4.0 pyarrow==6.0.1 pandas==1.5.3 'pydantic<2'
- name: Test
run: make testray

Expand Down
1 change: 1 addition & 0 deletions fugue/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
as_array_iterable,
as_arrow,
as_dict_iterable,
as_dicts,
as_fugue_df,
as_pandas,
drop_columns,
Expand Down
21 changes: 19 additions & 2 deletions fugue/dataframe/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,32 @@ def as_array_iterable(
return as_fugue_df(df).as_array_iterable(columns=columns, type_safe=type_safe)


@fugue_plugin
def as_dicts(
df: AnyDataFrame, columns: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""Convert any dataframe to a list of python dicts
:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to extract, defaults to None
:return: a list of python dicts
.. note::
The default implementation enforces ``type_safe`` True
"""
return as_fugue_df(df).as_dicts(columns=columns)


@fugue_plugin
def as_dict_iterable(
df: AnyDataFrame, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
"""Convert any dataframe to iterable of native python dicts
"""Convert any dataframe to iterable of python dicts
:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to extract, defaults to None
:return: iterable of native python dicts
:return: iterable of python dicts
.. note::
Expand Down
59 changes: 48 additions & 11 deletions fugue/dataframe/arrow_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

from .api import (
alter_columns,
as_array,
as_array_iterable,
as_dict_iterable,
as_dicts,
as_pandas,
drop_columns,
get_column_names,
Expand All @@ -30,6 +34,12 @@
select_columns,
)
from .dataframe import DataFrame, LocalBoundedDataFrame, _input_schema
from .utils import (
pa_table_as_array,
pa_table_as_array_iterable,
pa_table_as_dict_iterable,
pa_table_as_dicts,
)


class ArrowDataFrame(LocalBoundedDataFrame):
Expand Down Expand Up @@ -174,21 +184,20 @@ def as_arrow(self, type_safe: bool = False) -> pa.Table:
def as_array(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
return list(self.as_array_iterable(columns, type_safe=type_safe))
return pa_table_as_array(self.native, columns=columns)

def as_dicts(self, columns: Optional[List[str]] = None) -> List[Dict[str, Any]]:
return pa_table_as_dicts(self.native, columns=columns)

def as_array_iterable(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]:
if self.empty:
return
if columns is not None:
for x in self[columns].as_array_iterable(type_safe=type_safe):
yield x
else:
d = self.native.to_pydict()
cols = [d[n] for n in self.columns]
for arr in zip(*cols):
yield list(arr)
yield from pa_table_as_array_iterable(self.native, columns=columns)

def as_dict_iterable(
self, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
yield from pa_table_as_dict_iterable(self.native, columns=columns)


@as_local.candidate(lambda df: isinstance(df, pa.Table))
Expand All @@ -212,6 +221,34 @@ def _pa_table_as_pandas(df: pa.Table) -> pd.DataFrame:
)


@as_array.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table))
def _pa_table_as_array(
df: pa.Table, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
return pa_table_as_array(df, columns=columns)


@as_array_iterable.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table))
def _pa_table_as_array_iterable(
df: pa.Table, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]:
yield from pa_table_as_array_iterable(df, columns=columns)


@as_dicts.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table))
def _pa_table_as_dicts(
df: pa.Table, columns: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
return pa_table_as_dicts(df, columns=columns)


@as_dict_iterable.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table))
def _pa_table_as_dict_iterable(
df: pa.Table, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
yield from pa_table_as_dict_iterable(df, columns=columns)


@alter_columns.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table))
def _pa_table_alter_columns(
df: pa.Table, columns: Any, as_fugue: bool = False
Expand Down
22 changes: 20 additions & 2 deletions fugue/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,31 @@ def head(
"""
raise NotImplementedError

def as_dicts(self, columns: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""Convert to a list of python dicts
:param columns: columns to extract, defaults to None
:return: a list of python dicts
.. note::
The default implementation enforces ``type_safe`` True
"""
if columns is None:
columns = self.columns
idx = range(len(columns))
return [
{columns[i]: x[i] for i in idx}
for x in self.as_array(columns, type_safe=True)
]

def as_dict_iterable(
self, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
"""Convert to iterable of native python dicts
"""Convert to iterable of python dicts
:param columns: columns to extract, defaults to None
:return: iterable of native python dicts
:return: iterable of python dicts
.. note::
Expand Down
2 changes: 1 addition & 1 deletion fugue/dataframe/function_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def count(self, df: EmptyAwareIterable[List[Any]]) -> int:
class _ListDictParam(_LocalNoSchemaDataFrameParam):
@no_type_check
def to_input_data(self, df: DataFrame, ctx: Any) -> List[Dict[str, Any]]:
return list(df.as_local().as_dict_iterable())
return df.as_local().as_dicts()

@no_type_check
def to_output_df(
Expand Down
3 changes: 3 additions & 0 deletions fugue/dataframe/iterable_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ def as_array(
) -> List[Any]:
return list(self.as_array_iterable(columns, type_safe=type_safe))

def as_dicts(self, columns: Optional[List[str]] = None) -> List[Dict[str, Any]]:
return list(self.as_dict_iterable(columns))

def as_array_iterable(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]:
Expand Down
73 changes: 73 additions & 0 deletions fugue/dataframe/pandas_dataframe.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from typing import Any, Dict, Iterable, List, Optional, Tuple

import pandas as pd
import pyarrow as pa
from triad import assert_or_throw
from triad.collections.schema import Schema
from triad.utils.pandas_like import PD_UTILS
from triad.utils.pyarrow import pa_batch_to_dicts

from fugue.dataset.api import (
as_fugue_dataset,
Expand All @@ -17,6 +20,10 @@
from fugue.exceptions import FugueDataFrameOperationError

from .api import (
as_array,
as_array_iterable,
as_dict_iterable,
as_dicts,
drop_columns,
get_column_names,
get_schema,
Expand Down Expand Up @@ -134,6 +141,9 @@ def alter_columns(self, columns: Any) -> DataFrame:
return self
return PandasDataFrame(self.native, new_schema)

def as_arrow(self, type_safe: bool = False) -> pa.Table:
return PD_UTILS.as_arrow(self.native, schema=self.schema.pa_schema)

def as_array(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
Expand All @@ -150,6 +160,18 @@ def as_array_iterable(
):
yield row

def as_dicts(self, columns: Optional[List[str]] = None) -> List[Dict[str, Any]]:
res: List[Dict[str, Any]] = []
for block in _to_dicts(self.native, columns, self.schema):
res += block
return res

def as_dict_iterable(
self, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
for block in _to_dicts(self.native, columns, self.schema):
yield from block

def head(
self, n: int, columns: Optional[List[str]] = None
) -> LocalBoundedDataFrame:
Expand Down Expand Up @@ -272,6 +294,43 @@ def _pd_head(
return _adjust_df(df.head(n), as_fugue=as_fugue)


@as_array.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _pd_as_array(
df: pd.DataFrame, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
return list(_pd_as_array_iterable(df, columns, type_safe=type_safe))


@as_array_iterable.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _pd_as_array_iterable(
df: pd.DataFrame, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]:
for row in PD_UTILS.as_array_iterable(
df,
columns=columns,
type_safe=type_safe,
):
yield row


@as_dicts.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _pd_as_dicts(
df: pd.DataFrame, columns: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
res: List[Dict[str, Any]] = []
for block in _to_dicts(df, columns):
res += block
return res


@as_dict_iterable.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _pd_as_dict_iterable(
df: pa.Table, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
for block in _to_dicts(df, columns):
yield from block


def _adjust_df(res: pd.DataFrame, as_fugue: bool):
return res if not as_fugue else PandasDataFrame(res)

Expand All @@ -280,3 +339,17 @@ def _assert_no_missing(df: pd.DataFrame, columns: Iterable[Any]) -> None:
missing = [x for x in columns if x not in df.columns]
if len(missing) > 0:
raise FugueDataFrameOperationError("found nonexistent columns: {missing}")


def _to_dicts(
df: pd.DataFrame,
columns: Optional[List[str]] = None,
schema: Optional[Schema] = None,
) -> Iterable[List[Dict[str, Any]]]:
cols = list(df.columns) if columns is None else columns
assert_or_throw(len(cols) > 0, ValueError("columns cannot be empty"))
pa_schema = schema.extract(cols).pa_schema if schema is not None else None
adf = PD_UTILS.as_arrow(df[cols], schema=pa_schema)
for batch in adf.to_batches():
if batch.num_rows > 0:
yield pa_batch_to_dicts(batch)
Loading

0 comments on commit 83f2b2b

Please sign in to comment.