Skip to content

Commit

Permalink
Fix Pandas 2+ and Spark < 3.4 incompatibility issues (#474)
Browse files Browse the repository at this point in the history
* Fix duckdb compatibility issues

* update

* update

* Update spark

* update

* update

* update

* update

* update

* replace cloudpickle with pickle in fugue_spark
  • Loading branch information
goodwanghan authored Jun 6, 2023
1 parent 936668b commit b92be71
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 100 deletions.
64 changes: 64 additions & 0 deletions .github/workflows/test_spark.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions

name: Spark Tests

on:
push:
branches: [ master ]
paths-ignore:
- 'docs/**'
- '**.md'
pull_request:
branches: [ master ]
paths-ignore:
- 'docs/**'
- '**.md'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
test_combinations:
name: Spark ${{ matrix.spark-version }} Pandas ${{ matrix.pandas-version }}
runs-on: ubuntu-latest
strategy:
matrix:
spark-version: ["3.1.1","3.4.0"]
pandas-version: ["1.5.3","2.0.1"]

steps:
- uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v1
with:
python-version: 3.9
- name: Install dependencies
run: make devenv
- name: Install Spark ${{ matrix.spark-version }}
run: pip install "pyspark==${{ matrix.spark-version }}"
- name: Install Pandas ${{ matrix.pandas-version }}
run: pip install "pandas==${{ matrix.pandas-version }}"
- name: Downgrade Ibis
if: matrix.spark-version < '3.4.0'
run: pip install "ibis-framework<5"
- name: Test
run: make testspark

test_connect:
name: Spark Connect
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v1
with:
python-version: 3.9
- name: Install dependencies
run: make devenv
- name: Setup Spark
run: make sparkconnect
- name: Test
run: make testsparkconnect
41 changes: 0 additions & 41 deletions .github/workflows/test_spark_connect.yml

This file was deleted.

50 changes: 42 additions & 8 deletions fugue_spark/_utils/convert.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pickle
from typing import Any, Iterable, List, Tuple

import cloudpickle
import pandas as pd
import pyarrow as pa
import pyspark.sql as ps
Expand All @@ -11,8 +11,18 @@
from triad.utils.assertion import assert_arg_not_none, assert_or_throw
from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP
from triad.utils.schema import quote_name

import fugue.api as fa
from fugue import DataFrame

from .misc import is_spark_dataframe

try:
from pyspark.sql.types import TimestampNTZType # pylint: disable-all
except ImportError: # pragma: no cover
# pyspark < 3.2
from pyspark.sql.types import TimestampType as TimestampNTZType


def to_spark_schema(obj: Any) -> pt.StructType:
assert_arg_not_none(obj, "schema")
Expand Down Expand Up @@ -108,20 +118,44 @@ def to_type_safe_input(rows: Iterable[ps.Row], schema: Schema) -> Iterable[List[
yield r


def to_spark_df(session: ps.SparkSession, df: Any, schema: Any = None) -> ps.DataFrame:
if schema is not None and not isinstance(schema, pt.StructType):
schema = to_spark_schema(schema)
if isinstance(df, pd.DataFrame):
if pd.__version__ >= "2" and session.version < "3.4": # pragma: no cover
# pyspark < 3.4 does not support pandas 2 when doing
# createDataFrame, see this issue:
# https://stackoverflow.com/a/75926954/12309438
# this is a workaround with the cost of memory and speed.
if schema is None:
schema = to_spark_schema(fa.get_schema(df))
df = fa.as_fugue_df(df).as_array(type_safe=True)
return session.createDataFrame(df, schema=schema)
if isinstance(df, DataFrame):
if pd.__version__ >= "2" and session.version < "3.4": # pragma: no cover
if schema is None:
schema = to_spark_schema(df.schema)
return session.createDataFrame(df.as_array(type_safe=True), schema=schema)
return session.createDataFrame(df.as_pandas(), schema=schema)
else:
return session.createDataFrame(df, schema=schema)


def to_pandas(df: ps.DataFrame) -> pd.DataFrame:
if pd.__version__ < "2" or not any(
isinstance(x.dataType, (pt.TimestampType, pt.TimestampNTZType))
isinstance(x.dataType, (pt.TimestampType, TimestampNTZType))
for x in df.schema.fields
):
return df.toPandas()
else:

def serialize(dfs): # pragma: no cover
for df in dfs:
data = cloudpickle.dumps(df)
yield pd.DataFrame([[data]], columns=["data"])
def serialize(dfs): # pragma: no cover
for df in dfs:
data = pickle.dumps(df)
yield pd.DataFrame([[data]], columns=["data"])

sdf = df.mapInPandas(serialize, schema="data binary")
return pd.concat(cloudpickle.loads(x.data) for x in sdf.collect())
sdf = df.mapInPandas(serialize, schema="data binary")
return pd.concat(pickle.loads(x.data) for x in sdf.collect())


# TODO: the following function always set nullable to true,
Expand Down
14 changes: 8 additions & 6 deletions fugue_spark/_utils/io.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from typing import Any, Callable, Dict, List, Optional, Union

import pyspark.sql as ps
from fugue.collections.partition import PartitionSpec
from fugue.dataframe import DataFrame
from fugue._utils.io import FileParser, save_df
from fugue_spark.dataframe import SparkDataFrame
from fugue_spark._utils.convert import to_schema, to_spark_schema
from pyspark.sql import SparkSession
from triad.collections import Schema
from triad.collections.dict import ParamDict
from triad.collections.fs import FileSystem
from triad.utils.assertion import assert_or_throw
from triad.collections.dict import ParamDict

from fugue._utils.io import FileParser, save_df
from fugue.collections.partition import PartitionSpec
from fugue.dataframe import DataFrame
from fugue_spark.dataframe import SparkDataFrame

from .convert import to_schema, to_spark_schema


class SparkIO(object):
Expand Down
35 changes: 11 additions & 24 deletions fugue_spark/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from fugue.execution.execution_engine import ExecutionEngine, MapEngine, SQLEngine

from ._constants import FUGUE_SPARK_CONF_USE_PANDAS_UDF, FUGUE_SPARK_DEFAULT_CONF
from ._utils.convert import to_schema, to_spark_schema, to_type_safe_input
from ._utils.convert import to_schema, to_spark_schema, to_type_safe_input, to_spark_df
from ._utils.io import SparkIO
from ._utils.misc import is_spark_connect as _is_spark_connect, is_spark_dataframe
from ._utils.partition import even_repartition, hash_repartition, rand_repartition
Expand Down Expand Up @@ -735,40 +735,29 @@ def _to_df() -> SparkDataFrame:
)
if isinstance(df, SparkDataFrame):
return df
if isinstance(df, ArrowDataFrame):
raw_df: Any = df.as_pandas()
sdf = self.spark_session.createDataFrame(
raw_df, to_spark_schema(df.schema)
)
return SparkDataFrame(sdf, df.schema)
if isinstance(df, (ArrayDataFrame, IterableDataFrame)):
adf = ArrowDataFrame(df.as_array(type_safe=False), df.schema)
raw_df = adf.as_pandas()
sdf = self.spark_session.createDataFrame(
raw_df, to_spark_schema(df.schema)
)
sdf = to_spark_df(self.spark_session, adf, df.schema)
return SparkDataFrame(sdf, df.schema)
if any(pa.types.is_struct(t) for t in df.schema.types):
sdf = self.spark_session.createDataFrame(
df.as_array(type_safe=True), to_spark_schema(df.schema)
sdf = to_spark_df(
self.spark_session, df.as_array(type_safe=True), df.schema
)
else:
sdf = self.spark_session.createDataFrame(
df.as_pandas(), to_spark_schema(df.schema)
)
sdf = to_spark_df(self.spark_session, df, df.schema)
return SparkDataFrame(sdf, df.schema)
if is_spark_dataframe(df):
return SparkDataFrame(df, None if schema is None else to_schema(schema))
if isinstance(df, RDD):
assert_arg_not_none(schema, "schema")
sdf = self.spark_session.createDataFrame(df, to_spark_schema(schema))
sdf = to_spark_df(self.spark_session, df, schema)
return SparkDataFrame(sdf, to_schema(schema))
if isinstance(df, pd.DataFrame):
if PD_UTILS.empty(df):
temp_schema = to_spark_schema(PD_UTILS.to_schema(df))
sdf = self.spark_session.createDataFrame([], temp_schema)
sdf = to_spark_df(self.spark_session, [], temp_schema)
else:
sdf = self.spark_session.createDataFrame(df)
sdf = to_spark_df(self.spark_session, df)
return SparkDataFrame(sdf, schema)

# use arrow dataframe here to handle nulls in int cols
Expand All @@ -778,9 +767,7 @@ def _to_df() -> SparkDataFrame:
adf = ArrowDataFrame(df, to_schema(schema))
map_pos = [i for i, t in enumerate(adf.schema.types) if pa.types.is_map(t)]
if len(map_pos) == 0:
sdf = self.spark_session.createDataFrame(
adf.as_array(), to_spark_schema(adf.schema)
)
sdf = to_spark_df(self.spark_session, adf.as_array(), adf.schema)
else:

def to_dict(rows: Iterable[List[Any]]) -> Iterable[List[Any]]:
Expand All @@ -789,8 +776,8 @@ def to_dict(rows: Iterable[List[Any]]) -> Iterable[List[Any]]:
row[p] = dict(row[p])
yield row

sdf = self.spark_session.createDataFrame(
to_dict(adf.as_array_iterable()), to_spark_schema(adf.schema)
sdf = to_spark_df(
self.spark_session, to_dict(adf.as_array_iterable()), adf.schema
)
return SparkDataFrame(sdf, adf.schema)

Expand Down
2 changes: 1 addition & 1 deletion fugue_version/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.8.4"
__version__ = "0.8.5"
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def get_version() -> str:
setup(
name="fugue",
version=get_version(),
packages=find_packages(),
packages=find_packages(include=["fugue*"]),
description="An abstraction layer for distributed computation",
long_description=LONG_DESCRIPTION,
long_description_content_type="text/markdown",
Expand All @@ -42,7 +42,7 @@ def get_version() -> str:
],
extras_require={
"cpp_sql_parser": ["fugue-sql-antlr[cpp]>=0.1.6"],
"spark": ["pyspark"],
"spark": ["pyspark>=3.1.1"],
"dask": [
"dask[distributed,dataframe]; python_version < '3.8'",
"dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'",
Expand All @@ -62,7 +62,7 @@ def get_version() -> str:
"notebook": ["notebook", "jupyterlab", "ipython>=7.10.0"],
"all": [
"fugue-sql-antlr[cpp]>=0.1.6",
"pyspark",
"pyspark>=3.1.1",
"dask[distributed,dataframe]; python_version < '3.8'",
"dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'",
"ray[data]>=2.0.0",
Expand Down
20 changes: 7 additions & 13 deletions tests/fugue_spark/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from fugue.dataframe.pandas_dataframe import PandasDataFrame
from fugue.plugins import get_column_names, rename
from fugue_spark import SparkExecutionEngine
from fugue_spark._utils.convert import to_schema, to_spark_schema
from fugue_spark._utils.convert import to_schema, to_spark_schema, to_spark_df
from fugue_spark.dataframe import SparkDataFrame
from fugue_test.dataframe_suite import DataFrameTests

Expand Down Expand Up @@ -42,7 +42,7 @@ def df(self, data: Any = None, schema: Any = None):
return engine.to_df(data, schema=schema).native

def to_native_df(self, pdf: pd.DataFrame) -> Any:
return self.spark_session.createDataFrame(pdf)
return to_spark_df(self.spark_session, pdf)

def test_not_local(self):
assert not fi.is_local(self.df([], "a:int,b:str"))
Expand Down Expand Up @@ -131,30 +131,24 @@ def _df(data, schema=None):
session = SparkSession.builder.getOrCreate()
if schema is not None:
pdf = PandasDataFrame(data, to_schema(schema))
df = session.createDataFrame(pdf.native, to_spark_schema(schema))
df = to_spark_df(session, pdf.native, schema)
else:
df = session.createDataFrame(data)
df = to_spark_df(session, data)
return SparkDataFrame(df, schema)


def _test_get_column_names(spark_session):
df = spark_session.createDataFrame(
pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"])
)
df = to_spark_df(spark_session, pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"]))
assert get_column_names(df) == ["0", "1", "2"]


def _test_rename(spark_session):
pdf = spark_session.createDataFrame(
pd.DataFrame([[0, 1, 2]], columns=["a", "b", "c"])
)
pdf = to_spark_df(spark_session, pd.DataFrame([[0, 1, 2]], columns=["a", "b", "c"]))
df = rename(pdf, {})
assert isinstance(df, ps.DataFrame)
assert get_column_names(df) == ["a", "b", "c"]

pdf = spark_session.createDataFrame(
pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"])
)
pdf = to_spark_df(spark_session, pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"]))
df = rename(pdf, {"0": "_0", "1": "_1", "2": "_2"})
assert isinstance(df, ps.DataFrame)
assert get_column_names(df) == ["_0", "_1", "_2"]
Loading

0 comments on commit b92be71

Please sign in to comment.