From 4113389aa13b3b8304967fa90e31d49519be5753 Mon Sep 17 00:00:00 2001 From: Edwin Chan Date: Mon, 31 Oct 2022 23:56:25 +0800 Subject: [PATCH] feat: minimal spark deployment (#1132) * Initial commit of spark-branch with all packed logic/tests in semi-working state --- .github/workflows/tests.yml | 87 ++++ MANIFEST.in | 3 + Makefile | 11 + requirements-spark.txt | 5 + requirements-test.txt | 1 + src/pandas_profiling/__init__.py | 5 + src/pandas_profiling/config.py | 36 ++ src/pandas_profiling/model/alerts.py | 5 +- src/pandas_profiling/model/correlations.py | 4 - src/pandas_profiling/model/describe.py | 43 +- .../model/pandas/describe_counts_pandas.py | 1 + src/pandas_profiling/model/spark/__init__.py | 34 ++ .../model/spark/correlations_spark.py | 146 +++++++ .../model/spark/dataframe_spark.py | 54 +++ .../model/spark/describe_boolean_spark.py | 27 ++ .../model/spark/describe_categorical_spark.py | 28 ++ .../model/spark/describe_counts_spark.py | 55 +++ .../model/spark/describe_date_spark.py | 23 + .../model/spark/describe_generic_spark.py | 32 ++ .../model/spark/describe_numeric_spark.py | 138 ++++++ .../model/spark/describe_supported_spark.py | 33 ++ .../model/spark/duplicates_spark.py | 54 +++ .../model/spark/missing_spark.py | 95 ++++ .../model/spark/sample_spark.py | 43 ++ .../model/spark/summary_spark.py | 104 +++++ .../model/spark/table_spark.py | 58 +++ src/pandas_profiling/profile_report.py | 8 +- .../report/structure/overview.py | 26 +- .../structure/variables/render_categorical.py | 15 +- .../report/structure/variables/render_date.py | 32 +- tests/backends/spark_backend/example.py | 108 +++++ .../spark_backend/test_correlations_spark.py | 80 ++++ .../spark_backend/test_descriptions_spark.py | 409 ++++++++++++++++++ .../backends/spark_backend/test_duplicates.py | 39 ++ .../spark_backend/test_missing_spark.py | 28 ++ .../spark_backend/test_report_spark.py | 23 + .../spark_backend/test_sample_spark.py | 67 +++ tests/unit/test_describe.py | 1 - venv/spark.yml | 23 + 39 files changed, 1935 insertions(+), 49 deletions(-) create mode 100644 requirements-spark.txt create mode 100644 src/pandas_profiling/model/spark/__init__.py create mode 100644 src/pandas_profiling/model/spark/correlations_spark.py create mode 100644 src/pandas_profiling/model/spark/dataframe_spark.py create mode 100644 src/pandas_profiling/model/spark/describe_boolean_spark.py create mode 100644 src/pandas_profiling/model/spark/describe_categorical_spark.py create mode 100644 src/pandas_profiling/model/spark/describe_counts_spark.py create mode 100644 src/pandas_profiling/model/spark/describe_date_spark.py create mode 100644 src/pandas_profiling/model/spark/describe_generic_spark.py create mode 100644 src/pandas_profiling/model/spark/describe_numeric_spark.py create mode 100644 src/pandas_profiling/model/spark/describe_supported_spark.py create mode 100644 src/pandas_profiling/model/spark/duplicates_spark.py create mode 100644 src/pandas_profiling/model/spark/missing_spark.py create mode 100644 src/pandas_profiling/model/spark/sample_spark.py create mode 100644 src/pandas_profiling/model/spark/summary_spark.py create mode 100644 src/pandas_profiling/model/spark/table_spark.py create mode 100644 tests/backends/spark_backend/example.py create mode 100644 tests/backends/spark_backend/test_correlations_spark.py create mode 100644 tests/backends/spark_backend/test_descriptions_spark.py create mode 100644 tests/backends/spark_backend/test_duplicates.py create mode 100644 tests/backends/spark_backend/test_missing_spark.py create mode 100644 tests/backends/spark_backend/test_report_spark.py create mode 100644 tests/backends/spark_backend/test_sample_spark.py create mode 100644 venv/spark.yml diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 04e5f444c..e60cd6053 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -131,4 +131,91 @@ jobs: - run: make test_cov + - uses: actions/cache@v2 + if: startsWith(runner.os, 'Windows') + with: + path: ~\AppData\Local\pip\Cache + key: ${{ runner.os }}-${{ matrix.pandas }}-pip-${{ hashFiles('**/requirements.txt') }} + restore-keys: | + ${{ runner.os }}-${{ matrix.pandas }}-pip- + - run: | + pip install --upgrade pip setuptools wheel + pip install -r requirements.txt "${{ matrix.pandas }}" "${{ matrix.numpy }}" + pip install -r requirements-test.txt + - run: make install + - run: make test_cov - run: codecov -F py${{ matrix.python-version }}-${{ matrix.os }}-${{ matrix.pandas }}-${{ matrix.numpy }} + + + test_spark: + runs-on: ${{ matrix.os }} + continue-on-error: True + strategy: + matrix: + os: [ ubuntu-latest ] + python-version: [3.7, 3.8] + pandas: ["pandas==0.25.3", "pandas==1.0.5", "pandas>1.1"] + spark: ["2.3.0", "2.4.7", "3.0.1"] + hadoop: [ 2.7 ] + numpy: ["numpy==1.19.5"] + java_home: [ /usr/lib/jvm/java-8-openjdk-amd64 ] + exclude: + - python-version: 3.8 + spark: "2.3.0" + - python-version: 3.8 + spark: "2.4.7" +# - os: macos-latest +# python-version: 3.6 +# pandas: ">1.1" +# - os: windows-2016 +# python-version: 3.6 +# pandas: ">1.1" + + name: Tests Spark | python ${{ matrix.python-version }}, ${{ matrix.os }}, spark${{ matrix.spark }}, ${{ matrix.pandas }}, ${{ matrix.numpy }} + env: + JAVA_HOME: ${{ matrix.java_home }} + SPARK_VERSION: ${{ matrix.spark }} + HADOOP_VERSION: ${{ matrix.hadoop }} + SPARK_DIRECTORY: ${{ github.workspace }}/../ + SPARK_HOME: ${{ github.workspace }}/../spark/ + steps: + - uses: actions/checkout@v2 + - name: Setup python + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + architecture: x64 + - uses: actions/cache@v2 + if: startsWith(runner.os, 'Linux') + with: + path: ~/.cache/pip + key: ${{ runner.os }}-${{ matrix.pandas }}-pip-${{ hashFiles('**/requirements.txt') }} + restore-keys: | + ${{ runner.os }}-${{ matrix.pandas }}-pip-\ + - uses: actions/cache@v2 + if: startsWith(runner.os, 'macOS') + with: + path: ~/Library/Caches/pip + key: ${{ runner.os }}-${{ matrix.pandas }}-pip-${{ hashFiles('**/requirements.txt') }} + restore-keys: | + ${{ runner.os }}-${{ matrix.pandas }}-pip- + - uses: actions/cache@v2 + if: startsWith(runner.os, 'Windows') + with: + path: ~\AppData\Local\pip\Cache + key: ${{ runner.os }}-${{ matrix.pandas }}-pip-${{ hashFiles('**/requirements.txt') }} + restore-keys: | + ${{ runner.os }}-${{ matrix.pandas }}-pip- + - run: | + pip install --upgrade pip setuptools wheel + pip install pytest-spark>=0.6.0 pyarrow==1.0.1 pyspark=="${{ matrix.spark }}" + pip install -r requirements.txt + pip install -r requirements-test.txt + pip install "${{ matrix.pandas }}" "${{ matrix.numpy }}" + - if: ${{ matrix.spark != '3.0.1' }} + run: echo "ARROW_PRE_0_15_IPC_FORMAT=1" >> $GITHUB_ENV + - run: echo "SPARK_LOCAL_IP=127.0.0.1" >> $GITHUB_ENV + - run: make install + - run: make install-spark-ci + - run: make test_spark + diff --git a/MANIFEST.in b/MANIFEST.in index 8f055167a..95afcc539 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -13,6 +13,9 @@ recursive-include src/pandas_profiling/report/presentation/flavours/html/templat # Configuration include src/pandas_profiling/*.yaml +# Spark Dev venv +recursive-include venv *.yml + # Exclude development, docs, testing and example code exclude .pre-commit-config.yaml exclude commitlint.config.js diff --git a/Makefile b/Makefile index 5e6c48975..ac7c09239 100644 --- a/Makefile +++ b/Makefile @@ -12,6 +12,10 @@ test: pytest --nbval tests/notebooks/ pandas_profiling -h +test_spark: + pytest --spark_home=${SPARK_HOME} tests/backends/spark_backend/ + pandas_profiling -h + test_cov: pytest --cov=. tests/unit/ pytest --cov=. --cov-append tests/issues/ @@ -30,6 +34,13 @@ package: install: pip install -e .[notebook] +install-spark-ci: + sudo apt-get update + sudo apt-get -y install openjdk-8-jdk + curl https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz \ + --output ${SPARK_DIRECTORY}/spark.tgz + cd ${SPARK_DIRECTORY} && tar -xvzf spark.tgz && mv spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} spark + lint: pre-commit run --all-files diff --git a/requirements-spark.txt b/requirements-spark.txt new file mode 100644 index 000000000..ff5d62a3a --- /dev/null +++ b/requirements-spark.txt @@ -0,0 +1,5 @@ +# this provides the recommended pyspark and pyarrow versions for spark to work on pandas-profiling +# note that if you are using pyspark 2.3 or 2.4 and pyarrow >= 0.15, you might need to +# set ARROW_PRE_0_15_IPC_FORMAT=1 in your conf/spark-env.sh for toPandas functions to work properly +pyspark>=2.3.0 +pyarrow>=2.0.0 \ No newline at end of file diff --git a/requirements-test.txt b/requirements-test.txt index ba8247cc1..491d50a12 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -2,6 +2,7 @@ pytest coverage~=6.5 codecov pytest-cov +pytest-spark nbval pyarrow twine>=3.1.1 diff --git a/src/pandas_profiling/__init__.py b/src/pandas_profiling/__init__.py index 50dad76dc..e7f7bddd9 100644 --- a/src/pandas_profiling/__init__.py +++ b/src/pandas_profiling/__init__.py @@ -2,6 +2,7 @@ .. include:: ../../README.md """ +import importlib.util from pandas_profiling.compare_reports import compare from pandas_profiling.controller import pandas_decorator @@ -11,6 +12,10 @@ # backend import pandas_profiling.model.pandas # isort:skip # noqa +spec = importlib.util.find_spec("pyspark") +if spec is not None: + import pandas_profiling.model.spark # isort:skip # noqa + __all__ = [ "pandas_decorator", diff --git a/src/pandas_profiling/config.py b/src/pandas_profiling/config.py index e0fb16629..2294c9f71 100644 --- a/src/pandas_profiling/config.py +++ b/src/pandas_profiling/config.py @@ -351,6 +351,42 @@ def from_file(config_file: Union[Path, str]) -> "Settings": return Settings().parse_obj(data) +class SparkSettings(Settings): + # TO-DO write description + vars: Univariate = Univariate() + + vars.num.low_categorical_threshold = 0 + + infer_dtypes = False + + correlations: Dict[str, Correlation] = { + "spearman": Correlation(key="spearman"), + "pearson": Correlation(key="pearson"), + "kendall": Correlation(key="kendall"), + "cramers": Correlation(key="cramers"), + "phi_k": Correlation(key="phi_k"), + } + correlations["pearson"].calculate = True + correlations["spearman"].calculate = True + correlations["kendall"].calculate = False + correlations["cramers"].calculate = False + correlations["phi_k"].calculate = False + + interactions: Interactions = Interactions() + interactions.continuous = False + + missing_diagrams: Dict[str, bool] = { + "bar": False, + "matrix": False, + "dendrogram": False, + "heatmap": False, + } + + samples: Samples = Samples() + samples.tail = 0 + samples.random = 0 + + class Config: arg_groups: Dict[str, Any] = { "sensitive": { diff --git a/src/pandas_profiling/model/alerts.py b/src/pandas_profiling/model/alerts.py index 287d84318..f740b9d54 100644 --- a/src/pandas_profiling/model/alerts.py +++ b/src/pandas_profiling/model/alerts.py @@ -285,7 +285,6 @@ def supported_alerts(summary: dict) -> List[Alert]: ) ) if summary.get("n_distinct", np.nan) == 1: - summary["mode"] = summary["value_counts_without_nan"].index[0] alerts.append( Alert( alert_type=AlertType.CONSTANT, @@ -379,11 +378,11 @@ def get_alerts( def alert_value(value: float) -> bool: - return not np.isnan(value) and value > 0.01 + return not pd.isna(value) and value > 0.01 def skewness_alert(v: float, threshold: int) -> bool: - return not np.isnan(v) and (v < (-1 * threshold) or v > threshold) + return not pd.isna(v) and (v < (-1 * threshold) or v > threshold) def type_date_alert(series: pd.Series) -> bool: diff --git a/src/pandas_profiling/model/correlations.py b/src/pandas_profiling/model/correlations.py index 305a62ffd..168be05d0 100644 --- a/src/pandas_profiling/model/correlations.py +++ b/src/pandas_profiling/model/correlations.py @@ -89,10 +89,6 @@ def calculate_correlation( Returns: The correlation matrices for the given correlation measures. Return None if correlation is empty. """ - - if len(df) == 0: - return None - correlation_measures = { "auto": Auto, "pearson": Pearson, diff --git a/src/pandas_profiling/model/describe.py b/src/pandas_profiling/model/describe.py index 735de13a7..c92741547 100644 --- a/src/pandas_profiling/model/describe.py +++ b/src/pandas_profiling/model/describe.py @@ -90,21 +90,31 @@ def describe( ] pbar.update() - # Get correlations - correlation_names = get_active_correlations(config) - pbar.total += len(correlation_names) - - correlations = { - correlation_name: progress( - calculate_correlation, pbar, f"Calculate {correlation_name} correlation" - )(config, df, correlation_name, series_description) - for correlation_name in correlation_names - } + # Table statistics + table_stats = progress(get_table_stats, pbar, "Get dataframe statistics")( + config, df, series_description + ) - # make sure correlations is not None - correlations = { - key: value for key, value in correlations.items() if value is not None - } + # Get correlations + if table_stats["n"] != 0: + correlation_names = get_active_correlations(config) + pbar.total += len(correlation_names) + + correlations = { + correlation_name: progress( + calculate_correlation, + pbar, + f"Calculate {correlation_name} correlation", + )(config, df, correlation_name, series_description) + for correlation_name in correlation_names + } + + # make sure correlations is not None + correlations = { + key: value for key, value in correlations.items() if value is not None + } + else: + correlations = {} # Scatter matrix pbar.set_postfix_str("Get scatter matrix") @@ -118,11 +128,6 @@ def describe( get_scatter_plot, pbar, f"scatter {x}, {y}" )(config, df, x, y, interval_columns) - # Table statistics - table_stats = progress(get_table_stats, pbar, "Get dataframe statistics")( - config, df, series_description - ) - # missing diagrams missing_map = get_missing_active(config, table_stats) pbar.total += len(missing_map) diff --git a/src/pandas_profiling/model/pandas/describe_counts_pandas.py b/src/pandas_profiling/model/pandas/describe_counts_pandas.py index 24efdfb9c..9806c606d 100644 --- a/src/pandas_profiling/model/pandas/describe_counts_pandas.py +++ b/src/pandas_profiling/model/pandas/describe_counts_pandas.py @@ -45,6 +45,7 @@ def pandas_describe_counts( "value_counts_without_nan": value_counts_without_nan, } ) + try: summary["value_counts_index_sorted"] = summary[ "value_counts_without_nan" diff --git a/src/pandas_profiling/model/spark/__init__.py b/src/pandas_profiling/model/spark/__init__.py new file mode 100644 index 000000000..9d6a4aec9 --- /dev/null +++ b/src/pandas_profiling/model/spark/__init__.py @@ -0,0 +1,34 @@ +from pandas_profiling.model.spark import ( + correlations_spark, + dataframe_spark, + describe_boolean_spark, + describe_categorical_spark, + describe_counts_spark, + describe_date_spark, + describe_generic_spark, + describe_numeric_spark, + describe_supported_spark, + duplicates_spark, + missing_spark, + sample_spark, + summary_spark, + table_spark, +) + +__all__ = [ + "correlations_spark", + "dataframe_spark", + "describe_boolean_spark", + "describe_categorical_spark", + "describe_counts_spark", + "describe_date_spark", + "describe_generic_spark", + "describe_numeric_spark", + "describe_supported_spark", + "duplicates_spark", + "missing_spark", + "sample_spark", + "sample_spark", + "summary_spark", + "table_spark", +] diff --git a/src/pandas_profiling/model/spark/correlations_spark.py b/src/pandas_profiling/model/spark/correlations_spark.py new file mode 100644 index 000000000..58be2b624 --- /dev/null +++ b/src/pandas_profiling/model/spark/correlations_spark.py @@ -0,0 +1,146 @@ +"""Correlations between variables.""" +from typing import Optional + +import pandas as pd +import phik +import pyspark +from packaging import version +from pyspark.ml.feature import VectorAssembler +from pyspark.ml.stat import Correlation +from pyspark.sql import DataFrame +from pyspark.sql.functions import PandasUDFType, lit, pandas_udf +from pyspark.sql.types import ArrayType, DoubleType, StructField, StructType + +from pandas_profiling.config import Settings +from pandas_profiling.model.correlations import ( + Cramers, + Kendall, + Pearson, + PhiK, + Spearman, +) + +SPARK_CORRELATION_PEARSON = "pearson" +SPARK_CORRELATION_SPEARMAN = "spearman" + + +@Spearman.compute.register(Settings, DataFrame, dict) +def spark_spearman_compute( + config: Settings, df: DataFrame, summary: dict +) -> Optional[pd.DataFrame]: + matrix = _compute_spark_corr_natively( + df, summary, corr_type=SPARK_CORRELATION_SPEARMAN + ) + return pd.DataFrame(matrix, index=df.columns, columns=df.columns) + + +@Pearson.compute.register(Settings, DataFrame, dict) +def spark_pearson_compute( + config: Settings, df: DataFrame, summary: dict +) -> Optional[pd.DataFrame]: + matrix = _compute_spark_corr_natively( + df, summary, corr_type=SPARK_CORRELATION_PEARSON + ) + return pd.DataFrame(matrix, index=df.columns, columns=df.columns) + + +def _compute_spark_corr_natively( + df: DataFrame, summary: dict, corr_type: str +) -> ArrayType: + """ + This function exists as pearson and spearman correlation computations have the + exact same workflow. The syntax is Correlation.corr(dataframe, method="pearson" OR "spearman"), + and Correlation is from pyspark.ml.stat + """ + variables = {column: description["type"] for column, description in summary.items()} + interval_columns = [ + column for column, type_name in variables.items() if type_name == "Numeric" + ] + df = df.select(*interval_columns) + + # convert to vector column first + vector_col = "corr_features" + + assembler_args = {"inputCols": df.columns, "outputCol": vector_col} + + # As handleInvalid was only implemented in spark 2.4.0, we use it only if pyspark version >= 2.4.0 + if version.parse(pyspark.__version__) >= version.parse("2.4.0"): + assembler_args["handleInvalid"] = "skip" + + assembler = VectorAssembler(**assembler_args) + df_vector = assembler.transform(df).select(vector_col) + + # get correlation matrix + matrix = ( + Correlation.corr(df_vector, vector_col, method=corr_type).head()[0].toArray() + ) + return matrix + + +@Kendall.compute.register(Settings, DataFrame, dict) +def spark_kendall_compute( + config: Settings, df: DataFrame, summary: dict +) -> Optional[pd.DataFrame]: + raise NotImplementedError() + + +@Cramers.compute.register(Settings, DataFrame, dict) +def spark_cramers_compute( + config: Settings, df: DataFrame, summary: dict +) -> Optional[pd.DataFrame]: + raise NotImplementedError() + + +@PhiK.compute.register(Settings, DataFrame, dict) +def spark_phi_k_compute( + config: Settings, df: DataFrame, summary: dict +) -> Optional[pd.DataFrame]: + + threshold = config.categorical_maximum_correlation_distinct + intcols = { + key + for key, value in summary.items() + # DateTime currently excluded + # In some use cases, it makes sense to convert it to interval + # See https://github.com/KaveIO/PhiK/issues/7 + if value["type"] == "Numeric" and 1 < value["n_distinct"] + } + + supportedcols = { + key + for key, value in summary.items() + if value["type"] != "Unsupported" and 1 < value["n_distinct"] <= threshold + } + selcols = list(supportedcols.union(intcols)) + + if len(selcols) <= 1: + return None + + # pandas mapped udf works only with a groupby, we force the groupby to operate on all columns at once + # by giving one value to all columns + groupby_df = df.select(selcols).withColumn("groupby", lit(1)) + + # generate output schema for pandas_udf + output_schema_components = [] + for column in selcols: + output_schema_components.append(StructField(column, DoubleType(), True)) + output_schema = StructType(output_schema_components) + + # create the pandas grouped map function to do vectorized kendall within spark itself + @pandas_udf(output_schema, PandasUDFType.GROUPED_MAP) + def spark_phik(pdf: pd.DataFrame) -> pd.DataFrame: + correlation = phik.phik_matrix(df=pdf, interval_cols=list(intcols)) + return correlation + + # return the appropriate dataframe (similar to pandas_df.corr results) + if len(groupby_df.head(1)) > 0: + # perform correlation in spark, and get the results back in pandas + df = pd.DataFrame( + groupby_df.groupby("groupby").apply(spark_phik).toPandas().values, + columns=selcols, + index=selcols, + ) + else: + df = pd.DataFrame() + + return df diff --git a/src/pandas_profiling/model/spark/dataframe_spark.py b/src/pandas_profiling/model/spark/dataframe_spark.py new file mode 100644 index 000000000..5e6e16366 --- /dev/null +++ b/src/pandas_profiling/model/spark/dataframe_spark.py @@ -0,0 +1,54 @@ +import warnings + +from pyspark.sql import DataFrame + +from pandas_profiling.config import Settings +from pandas_profiling.model.dataframe import check_dataframe, preprocess + + +@check_dataframe.register +def spark_check_dataframe(df: DataFrame) -> None: + # FIXME: never... + if not isinstance(df, DataFrame): + warnings.warn("df is not of type pyspark.sql.dataframe.DataFrame") + + +@preprocess.register +def spark_preprocess(config: Settings, df: DataFrame) -> DataFrame: + """Preprocess the dataframe + + - Appends the index to the dataframe when it contains information + - Rename the "index" column to "df_index", if exists + - Convert the DataFrame's columns to str + + Args: + config: report Settings object + df: the pandas DataFrame + + Returns: + The preprocessed DataFrame + """ + + def _check_column_map_type(df: DataFrame, column_name: str) -> bool: + return str(df.select(column_name).schema[0].dataType).startswith("MapType") + + columns_to_remove = list( + filter(lambda x: _check_column_map_type(df, x), df.columns) + ) + + # raise warning and filter if this isn't empty + if columns_to_remove: + warnings.warn( + f"""spark-profiling does not handle MapTypes. Column(s) { ','.join(columns_to_remove) } will be ignored. + To fix this, consider converting your MapType into a StructTypes of StructFields i.e. + {{'key1':'value1',...}} -> [('key1','value1'), ...], or extracting the key,value pairs out + into individual columns using pyspark.sql.functions.explode. + """ + ) + columns_to_keep = list( + filter(lambda x: not _check_column_map_type(df, x), df.columns) + ) + return df.select(*columns_to_keep) + + else: + return df diff --git a/src/pandas_profiling/model/spark/describe_boolean_spark.py b/src/pandas_profiling/model/spark/describe_boolean_spark.py new file mode 100644 index 000000000..1bc660b7b --- /dev/null +++ b/src/pandas_profiling/model/spark/describe_boolean_spark.py @@ -0,0 +1,27 @@ +from typing import Tuple + +from pyspark.sql import DataFrame + +from pandas_profiling.config import Settings +from pandas_profiling.model.summary_algorithms import describe_boolean_1d + + +@describe_boolean_1d.register +def describe_boolean_1d_spark( + config: Settings, df: DataFrame, summary: dict +) -> Tuple[Settings, DataFrame, dict]: + """Describe a boolean series. + + Args: + series: The Series to describe. + summary: The dict containing the series description so far. + + Returns: + A dict containing calculated series description values. + """ + + value_counts = summary["value_counts"] + + summary.update({"top": value_counts.index[0], "freq": value_counts.iloc[0]}) + + return config, df, summary diff --git a/src/pandas_profiling/model/spark/describe_categorical_spark.py b/src/pandas_profiling/model/spark/describe_categorical_spark.py new file mode 100644 index 000000000..66adbe733 --- /dev/null +++ b/src/pandas_profiling/model/spark/describe_categorical_spark.py @@ -0,0 +1,28 @@ +from typing import Tuple + +from pyspark.sql import DataFrame + +from pandas_profiling.config import Settings +from pandas_profiling.model.summary_algorithms import describe_categorical_1d + + +@describe_categorical_1d.register +def describe_categorical_1d_spark( + config: Settings, df: DataFrame, summary: dict +) -> Tuple[Settings, DataFrame, dict]: + """Describe a categorical series. + + Args: + series: The Series to describe. + summary: The dict containing the series description so far. + + Returns: + A dict containing calculated series description values. + """ + + # FIXME: cat description + redact = config.vars.cat.redact + if not redact: + summary["first_rows"] = df.limit(5).toPandas().squeeze("columns") + + return config, df, summary diff --git a/src/pandas_profiling/model/spark/describe_counts_spark.py b/src/pandas_profiling/model/spark/describe_counts_spark.py new file mode 100644 index 000000000..678917247 --- /dev/null +++ b/src/pandas_profiling/model/spark/describe_counts_spark.py @@ -0,0 +1,55 @@ +from typing import Tuple + +from pyspark.sql import DataFrame + +from pandas_profiling.config import Settings +from pandas_profiling.model.summary_algorithms import describe_counts + + +@describe_counts.register +def describe_counts_spark( + config: Settings, series: DataFrame, summary: dict +) -> Tuple[Settings, DataFrame, dict]: + """Counts the values in a series (with and without NaN, distinct). + + Args: + series: Series for which we want to calculate the values. + + Returns: + A dictionary with the count values (with and without NaN, distinct). + """ + + value_counts = series.groupBy(series.columns).count() + value_counts = value_counts.sort("count", ascending=False).persist() + value_counts_index_sorted = value_counts.sort(series.columns[0], ascending=True) + + n_missing = value_counts.where(value_counts[series.columns[0]].isNull()).first() + if n_missing is None: + n_missing = 0 + else: + n_missing = n_missing["count"] + + # FIXME: reduce to top-n and bottom-n + value_counts_index_sorted = ( + value_counts_index_sorted.limit(200) + .toPandas() + .set_index(series.columns[0], drop=True) + .squeeze(axis="columns") + ) + + summary["n_missing"] = n_missing + summary["value_counts"] = value_counts.persist() + summary["value_counts_index_sorted"] = value_counts_index_sorted + + # this is necessary as freqtables requires value_counts_without_nan + # to be a pandas series. However, if we try to get everything into + # pandas we will definitly crash the server + summary["value_counts_without_nan"] = ( + value_counts.dropna() + .limit(200) + .toPandas() + .set_index(series.columns[0], drop=True) + .squeeze(axis="columns") + ) + + return config, series, summary diff --git a/src/pandas_profiling/model/spark/describe_date_spark.py b/src/pandas_profiling/model/spark/describe_date_spark.py new file mode 100644 index 000000000..b461656cf --- /dev/null +++ b/src/pandas_profiling/model/spark/describe_date_spark.py @@ -0,0 +1,23 @@ +from typing import Tuple + +from pyspark.sql import DataFrame + +from pandas_profiling.config import Settings +from pandas_profiling.model.summary_algorithms import describe_date_1d + + +@describe_date_1d.register +def describe_date_1d_spark( + config: Settings, df: DataFrame, summary: dict +) -> Tuple[Settings, DataFrame, dict]: + """Describe a date series. + + Args: + series: The Series to describe. + summary: The dict containing the series description so far. + + Returns: + A dict containing calculated series description values. + """ + + return config, df, summary diff --git a/src/pandas_profiling/model/spark/describe_generic_spark.py b/src/pandas_profiling/model/spark/describe_generic_spark.py new file mode 100644 index 000000000..4c08bb02f --- /dev/null +++ b/src/pandas_profiling/model/spark/describe_generic_spark.py @@ -0,0 +1,32 @@ +from typing import Tuple + +from pyspark.sql import DataFrame + +from pandas_profiling.config import Settings +from pandas_profiling.model.summary_algorithms import describe_generic + + +@describe_generic.register +def describe_generic_spark( + config: Settings, df: DataFrame, summary: dict +) -> Tuple[Settings, DataFrame, dict]: + """Describe generic series. + Args: + series: The Series to describe. + summary: The dict containing the series description so far. + Returns: + A dict containing calculated series description values. + """ + + # number of observations in the Series + length = df.count() + + summary["n"] = length + summary["p_missing"] = summary["n_missing"] / length + summary["count"] = length - summary["n_missing"] + + # FIXME: This is not correct, but used to fulfil render expectations + # @chanedwin + summary["memory_size"] = 0 + + return config, df, summary diff --git a/src/pandas_profiling/model/spark/describe_numeric_spark.py b/src/pandas_profiling/model/spark/describe_numeric_spark.py new file mode 100644 index 000000000..5e75f9d67 --- /dev/null +++ b/src/pandas_profiling/model/spark/describe_numeric_spark.py @@ -0,0 +1,138 @@ +from typing import Tuple + +import numpy as np +import pyspark.sql.functions as F +from pyspark.sql import DataFrame + +from pandas_profiling.config import Settings +from pandas_profiling.model.summary_algorithms import ( + describe_numeric_1d, + histogram_compute, +) + + +def numeric_stats_spark(df: DataFrame, summary: dict) -> dict: + column = df.columns[0] + + expr = [ + F.mean(F.col(column)).alias("mean"), + F.stddev(F.col(column)).alias("std"), + F.variance(F.col(column)).alias("variance"), + F.min(F.col(column)).alias("min"), + F.max(F.col(column)).alias("max"), + F.kurtosis(F.col(column)).alias("kurtosis"), + F.skewness(F.col(column)).alias("skewness"), + F.sum(F.col(column)).alias("sum"), + ] + return df.agg(*expr).first().asDict() + + +@describe_numeric_1d.register +def describe_numeric_1d_spark( + config: Settings, df: DataFrame, summary: dict +) -> Tuple[Settings, DataFrame, dict]: + """Describe a boolean series. + + Args: + series: The Series to describe. + summary: The dict containing the series description so far. + + Returns: + A dict containing calculated series description values. + """ + + stats = numeric_stats_spark(df, summary) + summary["min"] = stats["min"] + summary["max"] = stats["max"] + summary["mean"] = stats["mean"] + summary["std"] = stats["std"] + summary["variance"] = stats["variance"] + summary["skewness"] = stats["skewness"] + summary["kurtosis"] = stats["kurtosis"] + summary["sum"] = stats["sum"] + + value_counts = summary["value_counts"] + + n_infinite = ( + value_counts.where(F.col(df.columns[0]).isin([np.inf, -np.inf])) + .agg(F.sum(F.col("count")).alias("count")) + .first() + ) + if n_infinite is None or n_infinite["count"] is None: + n_infinite = 0 + else: + n_infinite = n_infinite["count"] + summary["n_infinite"] = n_infinite + + n_zeros = value_counts.where(f"{df.columns[0]} = 0").first() + if n_zeros is None: + n_zeros = 0 + else: + n_zeros = n_zeros["count"] + summary["n_zeros"] = n_zeros + + n_negative = ( + value_counts.where(f"{df.columns[0]} < 0") + .agg(F.sum(F.col("count")).alias("count")) + .first() + ) + if n_negative is None or n_negative["count"] is None: + n_negative = 0 + else: + n_negative = n_negative["count"] + summary["n_negative"] = n_negative + + quantiles = config.vars.num.quantiles + quantile_threshold = 0.05 + + summary.update( + { + f"{percentile:.0%}": value + for percentile, value in zip( + quantiles, + df.stat.approxQuantile( + f"{df.columns[0]}", + quantiles, + quantile_threshold, + ), + ) + } + ) + + median = summary["50%"] + + summary["mad"] = df.select( + (F.abs(F.col(f"{df.columns[0]}").cast("int") - median)).alias("abs_dev") + ).stat.approxQuantile("abs_dev", [0.5], quantile_threshold)[0] + + # FIXME: move to fmt + summary["p_negative"] = summary["n_negative"] / summary["n"] + summary["range"] = summary["max"] - summary["min"] + summary["iqr"] = summary["75%"] - summary["25%"] + summary["cv"] = summary["std"] / summary["mean"] if summary["mean"] else np.NaN + summary["p_zeros"] = summary["n_zeros"] / summary["n"] + summary["p_infinite"] = summary["n_infinite"] / summary["n"] + + # TODO - enable this feature + # because spark doesn't have an indexing system, there isn't really the idea of monotonic increase/decrease + # [feature enhancement] we could implement this if the user provides an ordinal column to use for ordering + # ... https://stackoverflow.com/questions/60221841/how-to-detect-monotonic-decrease-in-pyspark + summary["monotonic"] = 0 + + # this function only displays the top N (see config) values for a histogram. + # This might be confusing if there are a lot of values of equal magnitude, but we cannot bring all the values to + # display in pandas display + # the alternative is to do this in spark natively, but it is not trivial + infinity_values = [np.inf, -np.inf] + infinity_index = summary["value_counts_without_nan"].index.isin(infinity_values) + + summary.update( + histogram_compute( + config, + summary["value_counts_without_nan"][~infinity_index].index.values, + summary["n_distinct"], + weights=summary["value_counts_without_nan"][~infinity_index].values, + ) + ) + + return config, df, summary diff --git a/src/pandas_profiling/model/spark/describe_supported_spark.py b/src/pandas_profiling/model/spark/describe_supported_spark.py new file mode 100644 index 000000000..168c0dde6 --- /dev/null +++ b/src/pandas_profiling/model/spark/describe_supported_spark.py @@ -0,0 +1,33 @@ +from typing import Tuple + +from pyspark.sql import DataFrame + +from pandas_profiling.config import Settings +from pandas_profiling.model.summary_algorithms import describe_supported + + +@describe_supported.register +def describe_supported_spark( + config: Settings, series: DataFrame, summary: dict +) -> Tuple[Settings, DataFrame, dict]: + """Describe a supported series. + Args: + series: The Series to describe. + series_description: The dict containing the series description so far. + Returns: + A dict containing calculated series description values. + """ + + # number of non-NaN observations in the Series + count = summary["count"] + n_distinct = summary["value_counts"].count() + + summary["n_distinct"] = n_distinct + summary["p_distinct"] = n_distinct / count if count > 0 else 0 + + n_unique = summary["value_counts"].where("count == 1").count() + summary["is_unique"] = n_unique == count + summary["n_unique"] = n_unique + summary["p_unique"] = n_unique / count + + return config, series, summary diff --git a/src/pandas_profiling/model/spark/duplicates_spark.py b/src/pandas_profiling/model/spark/duplicates_spark.py new file mode 100644 index 000000000..0227cf95b --- /dev/null +++ b/src/pandas_profiling/model/spark/duplicates_spark.py @@ -0,0 +1,54 @@ +from typing import Any, Dict, Optional, Sequence, Tuple + +import pyspark.sql.functions as F +from pyspark.sql import DataFrame + +from pandas_profiling.config import Settings +from pandas_profiling.model.duplicates import get_duplicates + + +@get_duplicates.register(Settings, DataFrame, Sequence) +def spark_get_duplicates( + config: Settings, df: DataFrame, supported_columns: Sequence +) -> Tuple[Dict[str, Any], Optional[DataFrame]]: + """Obtain the most occurring duplicate rows in the DataFrame. + + Args: + config: report Settings object + df: the Pandas DataFrame. + supported_columns: the columns to consider + + Returns: + A subset of the DataFrame, ordered by occurrence. + """ + n_head = config.duplicates.head + + metrics: Dict[str, Any] = {} + if n_head == 0: + return metrics, None + + if not supported_columns or df.count() == 0: + metrics["n_duplicates"] = 0 + metrics["p_duplicates"] = 0.0 + return metrics, None + + duplicates_key = config.duplicates.key + if duplicates_key in df.columns: + raise ValueError( + f"Duplicates key ({duplicates_key}) may not be part of the DataFrame. Either change the " + f" column name in the DataFrame or change the 'duplicates.key' parameter." + ) + + duplicated_df = ( + df.groupBy(df.columns) + .agg(F.count("*").alias(duplicates_key)) + .withColumn(duplicates_key, F.col(duplicates_key).cast("int")) + .filter(F.col(duplicates_key) > 1) + ) + + metrics["n_duplicates"] = duplicated_df.count() + metrics["p_duplicates"] = metrics["n_duplicates"] / df.count() + + return metrics, ( + duplicated_df.orderBy(duplicates_key, ascending=False).limit(n_head).toPandas() + ) diff --git a/src/pandas_profiling/model/spark/missing_spark.py b/src/pandas_profiling/model/spark/missing_spark.py new file mode 100644 index 000000000..def3b76b5 --- /dev/null +++ b/src/pandas_profiling/model/spark/missing_spark.py @@ -0,0 +1,95 @@ +from typing import Any, List, Optional + +from pyspark.sql import DataFrame + +from pandas_profiling.config import Settings +from pandas_profiling.model.missing import ( + missing_bar, + missing_dendrogram, + missing_heatmap, + missing_matrix, +) +from pandas_profiling.visualisation.missing import ( + plot_missing_bar, + plot_missing_dendrogram, + plot_missing_heatmap, + plot_missing_matrix, +) + + +class MissingnoBarSparkPatch: + """ + Technical Debt : + This is a monkey patching object that allows usage of the library missingno as is for spark dataframes. + This is because missingno library's bar function always applies a isnull().sum() on dataframes in the visualisation + function, instead of allowing just values counts as an entry point. Thus, in order to calculate the + missing values dataframe in spark, we compute it first, then wrap it in this MissingnoBarSparkPatch object which + will be unwrapped by missingno and return the pre-computed value counts. + The best fix to this currently terrible patch is to submit a PR to missingno to separate preprocessing function + (compute value counts from df) and visualisation functions such that we can call the visualisation directly. + Unfortunately, the missingno library people have not really responded to our issues on gitlab. + See https://github.com/ResidentMario/missingno/issues/119. + We could also fork the missingno library and implement some of the code in our database, but that feels + like bad practice as well. + """ + + def __init__( + self, df: DataFrame, columns: List[str] = None, original_df_size: int = None + ): + self.df = df + self.columns = columns + self.original_df_size = original_df_size + + def isnull(self) -> Any: + """ + This patches the .isnull().sum() function called by missingno library + """ + return self # return self to patch .sum() function + + def sum(self) -> DataFrame: + """ + This patches the .sum() function called by missingno library + """ + return self.df # return unwrapped dataframe + + def __len__(self) -> Optional[int]: + """ + This patches the len(df) function called by missingno library + """ + return self.original_df_size + + +@missing_bar.register +def spark_missing_bar(config: Settings, df: DataFrame) -> str: + import pyspark.sql.functions as F + + # FIXME: move to univariate + data_nan_counts = ( + df.agg( + *[F.count(F.when(F.isnull(c) | F.isnan(c), c)).alias(c) for c in df.columns] + ) + .toPandas() + .squeeze(axis="index") + ) + + return plot_missing_bar( + config, + MissingnoBarSparkPatch( + df=data_nan_counts, columns=df.columns, original_df_size=df.count() + ), + ) + + +@missing_matrix.register +def spark_missing_matrix(config: Settings, df: DataFrame) -> str: + return plot_missing_matrix(config, MissingnoBarSparkPatch(df)) + + +@missing_heatmap.register +def spark_missing_heatmap(config: Settings, df: DataFrame) -> str: + return plot_missing_heatmap(config, MissingnoBarSparkPatch(df)) + + +@missing_dendrogram.register +def spark_missing_dendrogram(config: Settings, df: DataFrame) -> str: + return plot_missing_dendrogram(config, MissingnoBarSparkPatch(df)) diff --git a/src/pandas_profiling/model/spark/sample_spark.py b/src/pandas_profiling/model/spark/sample_spark.py new file mode 100644 index 000000000..bab3dfb7e --- /dev/null +++ b/src/pandas_profiling/model/spark/sample_spark.py @@ -0,0 +1,43 @@ +import warnings +from typing import List + +from pyspark.sql.dataframe import DataFrame + +from pandas_profiling.config import Settings +from pandas_profiling.model.sample import Sample, get_sample + + +@get_sample.register(Settings, DataFrame) +def spark_get_sample(config: Settings, df: DataFrame) -> List[Sample]: + """Obtains a sample from head and tail of the DataFrame + + Args: + config: Settings object + df: the spark DataFrame + + Returns: + a list of Sample objects + """ + samples: List[Sample] = [] + if len(df.head(1)) == 0: + return samples + + n_head = config.samples.head + if n_head > 0: + samples.append( + Sample(id="head", data=df.limit(n_head).toPandas(), name="First rows") + ) + + n_tail = config.samples.tail + if n_tail > 0: + warnings.warn( + "tail sample not implemented for spark. Set config.samples.n_tail to 0 to disable this warning" + ) + + n_random = config.samples.random + if n_random > 0: + warnings.warn( + "random sample not implemented for spark. Set config.samples.n_random to 0 to disable this warning" + ) + + return samples diff --git a/src/pandas_profiling/model/spark/summary_spark.py b/src/pandas_profiling/model/spark/summary_spark.py new file mode 100644 index 000000000..a6486957e --- /dev/null +++ b/src/pandas_profiling/model/spark/summary_spark.py @@ -0,0 +1,104 @@ +"""Compute statistical description of datasets.""" +import multiprocessing +from typing import Tuple + +import numpy as np +from pyspark.sql import DataFrame +from tqdm import tqdm +from visions import VisionsTypeset + +from pandas_profiling.config import Settings +from pandas_profiling.model.summarizer import BaseSummarizer +from pandas_profiling.model.summary import describe_1d, get_series_descriptions +from pandas_profiling.utils.dataframe import sort_column_names + + +@describe_1d.register +def spark_describe_1d( + config: Settings, + series: DataFrame, + summarizer: BaseSummarizer, + typeset: VisionsTypeset, +) -> dict: + """Describe a series (infer the variable type, then calculate type-specific values). + + Args: + config: report Settings object + series: The Series to describe. + summarizer: Summarizer object + typeset: Typeset + + Returns: + A Series containing calculated series description values. + """ + + # Make sure pd.NA is not in the series + series = series.fillna(np.nan) + + # get `infer_dtypes` (bool) from config + if config.infer_dtypes: + # Infer variable types + vtype = typeset.infer_type(series) + series = typeset.cast_to_inferred(series) + else: + # Detect variable types from pandas dataframe (df.dtypes). + # [new dtypes, changed using `astype` function are now considered] + if str(series.schema[0].dataType).startswith("ArrayType"): + dtype = "ArrayType" + else: + dtype = str(series.schema[0].dataType) + vtype = { + "IntegerType": "Numeric", + "LongType": "Numeric", + "DoubleType": "Numeric", + "StringType": "Categorical", + "ArrayType": "Categorical", + "BooleanType": "Boolean", + "DateType": "DateTime", + "TimestampType": "DateTime", + }[dtype] + + return summarizer.summarize(config, series, dtype=vtype) + + +@get_series_descriptions.register +def spark_get_series_descriptions( + config: Settings, + df: DataFrame, + summarizer: BaseSummarizer, + typeset: VisionsTypeset, + pbar: tqdm, +) -> dict: + series_description = {} + + def multiprocess_1d(args: tuple) -> Tuple[str, dict]: + """Wrapper to process series in parallel. + + Args: + column: The name of the column. + series: The series values. + + Returns: + A tuple with column and the series description. + """ + column, df = args + return column, describe_1d(config, df.select(column), summarizer, typeset) + + args = [(name, df) for name in df.columns] + with multiprocessing.pool.ThreadPool(12) as executor: + for i, (column, description) in enumerate( + executor.imap_unordered(multiprocess_1d, args) + ): + pbar.set_postfix_str(f"Describe variable:{column}") + + # summary clean up for spark + description.pop("value_counts") + + series_description[column] = description + pbar.update() + series_description = {k: series_description[k] for k in df.columns} + + # Mapping from column name to variable type + series_description = sort_column_names(series_description, config.sort) + + return series_description diff --git a/src/pandas_profiling/model/spark/table_spark.py b/src/pandas_profiling/model/spark/table_spark.py new file mode 100644 index 000000000..9140e2b54 --- /dev/null +++ b/src/pandas_profiling/model/spark/table_spark.py @@ -0,0 +1,58 @@ +from collections import Counter + +from pyspark.sql import DataFrame + +from pandas_profiling.config import Settings +from pandas_profiling.model.table import get_table_stats + + +@get_table_stats.register +def spark_get_table_stats( + config: Settings, df: DataFrame, variable_stats: dict +) -> dict: + """General statistics for the DataFrame. + + Args: + config: report Settings object + df: The DataFrame to describe. + variable_stats: Previously calculated statistic on the DataFrame. + + Returns: + A dictionary that contains the table statistics. + """ + n = df.count() + + result = {"n": n, "n_var": len(df.columns)} + + table_stats = { + "n_cells_missing": 0, + "n_vars_with_missing": 0, + "n_vars_all_missing": 0, + } + + for series_summary in variable_stats.values(): + if "n_missing" in series_summary and series_summary["n_missing"] > 0: + table_stats["n_vars_with_missing"] += 1 + table_stats["n_cells_missing"] += series_summary["n_missing"] + if series_summary["n_missing"] == n: + table_stats["n_vars_all_missing"] += 1 + + # without this check we'll get a div by zero error + if result["n"] * result["n_var"] > 0: + table_stats["p_cells_missing"] = ( + table_stats["n_cells_missing"] / (result["n"] * result["n_var"]) + if result["n"] > 0 + else 0 + ) + else: + table_stats["p_cells_missing"] = 0 + + result["p_cells_missing"] = table_stats["p_cells_missing"] + result["n_cells_missing"] = table_stats["n_cells_missing"] + result["n_vars_all_missing"] = table_stats["n_vars_all_missing"] + result["n_vars_with_missing"] = table_stats["n_vars_with_missing"] + + # Variable type counts + result["types"] = dict(Counter([v["type"] for v in variable_stats.values()])) + + return result diff --git a/src/pandas_profiling/profile_report.py b/src/pandas_profiling/profile_report.py index f0a120b02..f9d5ad94c 100644 --- a/src/pandas_profiling/profile_report.py +++ b/src/pandas_profiling/profile_report.py @@ -10,7 +10,7 @@ from typeguard import typechecked from visions import VisionsTypeset -from pandas_profiling.config import Config, Settings +from pandas_profiling.config import Config, Settings, SparkSettings from pandas_profiling.expectations_report import ExpectationsReport from pandas_profiling.model.alerts import AlertType from pandas_profiling.model.describe import describe as describe_df @@ -493,3 +493,9 @@ def compare( from pandas_profiling.compare_reports import compare return compare([self, other], config if config is not None else self.config) + + def get_default_settings(self, df: Any) -> BaseSettings: + if isinstance(df, (pd.DataFrame, pd.Series)): + return Settings() + else: + return SparkSettings() diff --git a/src/pandas_profiling/report/structure/overview.py b/src/pandas_profiling/report/structure/overview.py index 9dbf5695c..90bee69e5 100644 --- a/src/pandas_profiling/report/structure/overview.py +++ b/src/pandas_profiling/report/structure/overview.py @@ -48,19 +48,19 @@ def get_dataset_overview(config: Settings, summary: dict) -> Renderable: }, ] ) - - table_metrics.extend( - [ - { - "name": "Total size in memory", - "value": fmt_bytesize(summary["table"]["memory_size"]), - }, - { - "name": "Average record size in memory", - "value": fmt_bytesize(summary["table"]["record_size"]), - }, - ] - ) + if "memory_size" in summary["table"]: + table_metrics.extend( + [ + { + "name": "Total size in memory", + "value": fmt_bytesize(summary["table"]["memory_size"]), + }, + { + "name": "Average record size in memory", + "value": fmt_bytesize(summary["table"]["record_size"]), + }, + ] + ) dataset_info = Table( table_metrics, name="Dataset statistics", style=config.html.style diff --git a/src/pandas_profiling/report/structure/variables/render_categorical.py b/src/pandas_profiling/report/structure/variables/render_categorical.py index 4cc2f2d8e..910c63350 100644 --- a/src/pandas_profiling/report/structure/variables/render_categorical.py +++ b/src/pandas_profiling/report/structure/variables/render_categorical.py @@ -403,11 +403,13 @@ def render_categorical(config: Settings, summary: dict) -> dict: overview_items = [] - if length: + # length isn't being computed for categorical in spark + if length and "max_length" in summary: length_table, length_histo = render_categorical_length(config, summary, varid) overview_items.append(length_table) - if characters: + # characters isn't being computed for categorical in spark + if characters and "category_alias_counts" in summary: overview_table_char, unitab = render_categorical_unicode(config, summary, varid) overview_items.append(overview_table_char) @@ -444,8 +446,9 @@ def render_categorical(config: Settings, summary: dict) -> dict: ) overview_items.append(sample) + # length isn't being computed in spark. disable rendering string_items: List[Renderable] = [frequency_table] - if length: + if length and "max_length" in summary: string_items.append(length_histo) show = config.plot.cat_freq.show @@ -515,7 +518,8 @@ def render_categorical(config: Settings, summary: dict) -> dict: ), ] - if words: + # words aren't being computed for categorical in spark + if words and "word_counts" in summary: woc = freq_table( freqtable=summary["word_counts"], n=_get_n(summary["word_counts"]), @@ -538,7 +542,8 @@ def render_categorical(config: Settings, summary: dict) -> dict: ) ) - if characters: + # characters aren't being computed for categorical in spark + if characters and "category_alias_counts" in summary: bottom_items.append( Container( [unitab], diff --git a/src/pandas_profiling/report/structure/variables/render_date.py b/src/pandas_profiling/report/structure/variables/render_date.py index a21d89445..afec3ede8 100644 --- a/src/pandas_profiling/report/structure/variables/render_date.py +++ b/src/pandas_profiling/report/structure/variables/render_date.py @@ -57,6 +57,15 @@ def render_date(config: Settings, summary: Dict[str, Any]) -> Dict[str, Any]: ], style=config.html.style, ) + if "histogram" not in summary: + template_variables["top"] = Container([info, table1], sequence_type="grid") + else: + table2 = Table( + [ + {"name": "Minimum", "value": fmt(summary["min"]), "alert": False}, + {"name": "Maximum", "value": fmt(summary["max"]), "alert": False}, + ] + ) table2 = Table( [ @@ -86,9 +95,26 @@ def render_date(config: Settings, summary: Dict[str, Any]) -> Dict[str, Any]: alt="Mini histogram", ) - template_variables["top"] = Container( - [info, table1, table2, mini_histo], sequence_type="grid" - ) + # Bottom + bottom = Container( + [ + Image( + histogram( + config, + summary["histogram"][0], + summary["histogram"][1], + date=True, + ), + image_format=image_format, + alt="Histogram", + caption=f"Histogram with fixed size bins (bins={len(summary['histogram'][1]) - 1})", + name="Histogram", + anchor_id=f"{varid}histogram", + ) + ], + sequence_type="tabs", + anchor_id=summary["varid"], + ) if isinstance(summary["histogram"], list): hist_data = histogram( diff --git a/tests/backends/spark_backend/example.py b/tests/backends/spark_backend/example.py new file mode 100644 index 000000000..06bfd7c9c --- /dev/null +++ b/tests/backends/spark_backend/example.py @@ -0,0 +1,108 @@ +import logging +import warnings +from datetime import date, datetime + +import numpy as np +from matplotlib import MatplotlibDeprecationWarning +from pyspark.sql import SparkSession + +logging.basicConfig(level=logging.INFO) +import pandas as pd + +from pandas_profiling import ProfileReport +from pandas_profiling.config import Settings + +spark_session = ( + SparkSession.builder.appName("SparkProfiling").master("local[*]").getOrCreate() +) + +print(spark_session.sparkContext.uiWebUrl) + +correlation_testdata = pd.DataFrame( + { + "test_num_1": [1, 2, 3, 5, 7, 8, 9, -100, -20, np.inf, -np.inf], + "test_num_2": [11, 12, 13, 15, 17, 18, 4, 1, 4, 10, 20], + "test_num_na1": [1, np.nan, 3, 5, 7, 8, np.nan, 1, np.nan, np.nan, np.nan], + "test_num_na2": [11, np.nan, 13, 15, 17, 18, 4, 11, 1, 2, 3], + "test_num_na3": [11, np.nan, 13, 15, 17, 18, 4, 11, np.nan, 4, 4], + "test_cat_1": [ + "one", + "one", + "one", + "two", + "four", + "four", + "five", + "seven", + "seven", + "seven", + "one", + ], + "test_cat_2": [ + "one", + "one", + "two", + "two", + "three", + "four", + "four", + "two", + "seven", + None, + None, + ], + "test_cat_3": [ + "one", + "one", + "two", + "two", + "three", + "four", + "four", + "two", + "one", + "one", + "one", + ], + "test_bool": [ + True, + False, + True, + False, + True, + False, + True, + False, + True, + False, + True, + ], + "test_date": [date(2019, 5, 11)] * 11, + "test_datetime": [datetime(2019, 5, 11, 3, 3, 3)] * 11, + } +) +upscale = 100 +if upscale > 1: + correlation_testdata = pd.concat([correlation_testdata] * upscale) + +correlation_data_num = spark_session.createDataFrame(correlation_testdata) + +cfg = Settings() +cfg.infer_dtypes = False +cfg.correlations["kendall"].calculate = False +cfg.correlations["cramers"].calculate = False +cfg.correlations["phi_k"].calculate = False +cfg.interactions.continuous = False +cfg.missing_diagrams["bar"] = False +cfg.missing_diagrams["dendrogram"] = False +cfg.missing_diagrams["heatmap"] = False +cfg.missing_diagrams["matrix"] = False +cfg.samples.tail = 0 +cfg.samples.random = 0 + + +# Create and start the monitoring process +warnings.filterwarnings("ignore", category=MatplotlibDeprecationWarning) + +a = ProfileReport(correlation_data_num, config=cfg) +a.to_file("test.html", silent=False) diff --git a/tests/backends/spark_backend/test_correlations_spark.py b/tests/backends/spark_backend/test_correlations_spark.py new file mode 100644 index 000000000..c3b25009b --- /dev/null +++ b/tests/backends/spark_backend/test_correlations_spark.py @@ -0,0 +1,80 @@ +import pandas as pd +import pytest + +from pandas_profiling.config import Settings +from pandas_profiling.model.correlations import Kendall +from pandas_profiling.model.pandas.correlations_pandas import ( + pandas_pearson_compute, + pandas_spearman_compute, +) +from pandas_profiling.model.spark.correlations_spark import ( + spark_pearson_compute, + spark_spearman_compute, +) + + +@pytest.fixture +def correlation_data_num(spark_session): + correlation_testdata = pd.DataFrame( + { + "test_num_1": [1, 2, 3, 5, 7, 8, 9], + "test_num_2": [11, 12, 13, 15, 17, 18, 4], + } + ) + + return spark_session.createDataFrame(correlation_testdata) + + +@pytest.fixture +def correlation_data_cat(spark_session): + correlation_testdata = pd.DataFrame( + { + "test_cat_1": ["one", "one", "one", "two", "two", "four", "four", "five"], + "test_cat_2": ["one", "one", "two", "two", "three", "four", "four", "two"], + "test_cat_3": ["one", "one", "two", "two", "three", "four", "four", "two"], + } + ) + + return spark_session.createDataFrame(correlation_testdata) + + +@pytest.fixture +def correlation_var_types(): + return {"test_num_1": {"type": "Numeric"}, "test_num_2": {"type": "Numeric"}} + + +def test_spearman_spark(correlation_data_num, correlation_var_types): + cfg = Settings() + + res_spark = spark_spearman_compute( + cfg, + correlation_data_num, + correlation_var_types, + ) + + res_pandas = pandas_spearman_compute(cfg, correlation_data_num.toPandas(), {}) + + pd.testing.assert_frame_equal(res_pandas, res_spark) + + +def test_pearson_spark(correlation_data_num, correlation_var_types): + cfg = Settings() + + res_spark = spark_pearson_compute( + cfg, + correlation_data_num, + correlation_var_types, + ) + + res_pandas = pandas_pearson_compute(cfg, correlation_data_num.toPandas(), {}) + + pd.testing.assert_frame_equal(res_pandas, res_spark) + + +def test_kendall_spark(correlation_data_cat): + cfg = Settings() + res_pandas = Kendall.compute(cfg, correlation_data_cat.toPandas(), {}) + + with pytest.raises(NotImplementedError): + res_spark = Kendall.compute(cfg, correlation_data_cat, {}) + pd.testing.assert_frame_equal(res_pandas, res_spark) diff --git a/tests/backends/spark_backend/test_descriptions_spark.py b/tests/backends/spark_backend/test_descriptions_spark.py new file mode 100644 index 000000000..698f9fca9 --- /dev/null +++ b/tests/backends/spark_backend/test_descriptions_spark.py @@ -0,0 +1,409 @@ +import datetime + +import numpy as np +import pandas as pd +import pytest + +from pandas_profiling.config import SparkSettings +from pandas_profiling.model.describe import describe +from pandas_profiling.model.summary import * +from pandas_profiling.model.typeset import ProfilingTypeSet + +check_is_NaN = "pandas_profiling.check_is_NaN" + + +@pytest.fixture +def describe_data(): + data = { + "id": [chr(97 + c) for c in range(1, 9)] + ["d"], + "x": [50, 50, -10, 0, 0, 5, 15, -3, np.nan], + "y": [ + 0.000001, + 654.152, + np.nan, + 15.984512, + 3122, + -3.1415926535, + 111, + 15.9, + 13.5, + ], + "cat": [ + "a", + "long text value", + "Élysée", + "", + None, + "some B.s HTML stuff", + "c", + "c", + "c", + ], + "s1": np.ones(9), + "s2": ["some constant text $ % value {obj} " for _ in range(1, 10)], + "somedate": [ + datetime.date(2011, 7, 4), + datetime.datetime(2022, 1, 1, 13, 57), + datetime.datetime(1990, 12, 9), + np.nan, + datetime.datetime(1990, 12, 9), + datetime.datetime(1970, 12, 9), + datetime.datetime(1972, 1, 2), + datetime.datetime(1970, 12, 9), + datetime.datetime(1970, 12, 9), + ], + "bool_tf": [True, True, False, True, False, True, True, False, True], + "bool_tf_with_nan": [ + True, + False, + False, + False, + False, + True, + True, + False, + np.nan, + ], + "bool_01": [1, 1, 0, 1, 1, 0, 0, 0, 1], + "bool_01_with_nan": [1, 0, 1, 0, 0, 1, 1, 0, np.nan], + "list": [ + [1, 2], + [1, 2], + [1, 2], + [1, 2], + [1, 2], + [1, 2], + [1, 2], + [1, 2], + [1, 2], + ], + "mixed": [1, 2, "a", 4, 5, 6, 7, 8, 9], + "dict": [{"hello": "there", "General": "Kenobi"}], + } + return data + + +@pytest.fixture +def expected_results(): + return { + "id": { + "count": 9, + "cv": check_is_NaN, + "n_distinct": 8, + "histogram": check_is_NaN, + "iqr": check_is_NaN, + "is_unique": False, + "kurtosis": check_is_NaN, + "mad": check_is_NaN, + "max": check_is_NaN, + "mean": check_is_NaN, + "min": check_is_NaN, + "mini_histogram": check_is_NaN, + "n_missing": 0, + "p_missing": 0.0, + "p_distinct": 0.88888888, + "p_zeros": check_is_NaN, + "range": check_is_NaN, + "skewness": check_is_NaN, + "std": check_is_NaN, + "sum": check_is_NaN, + "variance": check_is_NaN, + }, + "x": { + "n": 9, + "count": 9, + "p_missing": 0.0, + "n_distinct": 7, + "n_unique": 5, + "p_distinct": 0.7777777777777778, + "is_unique": False, + "p_unique": 0.5555555555555556, + "n_infinite": 0, + "p_infinite": 0.0, + "n_zeros": 2, + "p_zeros": 0.2222222222222222, + "n_negative": 2, + "p_negative": 0.2222222222222222, + "5%": -10.0, + "25%": -3.0, + "50%": 0.0, + "75%": 15.0, + "95%": 50.0, + "mad": 5.0, + "min": -10.0, + "max": check_is_NaN, + "mean": check_is_NaN, + "std": check_is_NaN, + "variance": check_is_NaN, + "kurtosis": check_is_NaN, + "skewness": check_is_NaN, + "sum": check_is_NaN, + "range": check_is_NaN, + "iqr": 18.0, + "cv": check_is_NaN, + }, + "y": { + "n": "9", + "n": 9, + "count": 9, + "p_missing": 0.0, + "n_distinct": 9, + "n_unique": 9, + "p_distinct": 1.0, + "is_unique": True, + "p_unique": 1.0, + "n_infinite": 0, + "p_infinite": 0.0, + "n_zeros": 0, + "p_zeros": 0.0, + "n_negative": 1, + "p_negative": 0.1111111111111111, + "5%": -3.1415926535, + "25%": 1e-06, + "50%": 15.9, + "75%": 111.0, + "95%": 3122.0, + "mad": 15.9, + "min": -3.1415926535, + "max": check_is_NaN, + "mean": check_is_NaN, + "std": check_is_NaN, + "variance": check_is_NaN, + "kurtosis": check_is_NaN, + "skewness": check_is_NaN, + "sum": check_is_NaN, + "range": check_is_NaN, + "iqr": 110.999999, + "cv": check_is_NaN, + }, + "cat": { + "n": 9, + "count": 8, + "p_missing": 0.1111111111111111, + "n_distinct": 7, + "n_unique": 6, + "p_distinct": 0.875, + "is_unique": False, + "p_unique": 0.75, + }, + "s1": { + "n": 9, + "count": 9, + "p_missing": 0.0, + "n_distinct": 1, + "n_unique": 0, + "p_distinct": 0.1111111111111111, + "is_unique": False, + "p_unique": 0.0, + "n_infinite": 0, + "p_infinite": 0.0, + "n_zeros": 0, + "p_zeros": 0.0, + "n_negative": 0, + "p_negative": 0.0, + "5%": 1.0, + "25%": 1.0, + "50%": 1.0, + "75%": 1.0, + "95%": 1.0, + "mad": 0.0, + "min": 1.0, + "max": 1.0, + "mean": 1.0, + "std": 0.0, + "variance": 0.0, + "kurtosis": check_is_NaN, + "skewness": check_is_NaN, + "sum": 9.0, + "range": 0.0, + "iqr": 0.0, + "cv": 0.0, + }, + "s2": { + "count": 9, + "cv": check_is_NaN, + "n_distinct": 1, + "histogram": check_is_NaN, + "iqr": check_is_NaN, + "is_unique": False, + "kurtosis": check_is_NaN, + "mad": check_is_NaN, + "max": check_is_NaN, + "mean": check_is_NaN, + "min": check_is_NaN, + "mini_histogram": check_is_NaN, + "n_missing": 0, + "p_missing": 0.0, + "p_distinct": 0.1111111111111111, + "p_zeros": check_is_NaN, + "range": check_is_NaN, + "skewness": check_is_NaN, + "std": check_is_NaN, + "sum": check_is_NaN, + "variance": check_is_NaN, + }, + "somedate": { + "n": 9, + "count": 8, + "p_missing": 0.1111111111111111, + "n_distinct": 6, + "n_unique": 4, + "p_distinct": 0.75, + "is_unique": False, + "p_unique": 0.5, + }, + "bool_tf": { + "count": 9, + "n_distinct": 2, + "is_unique": False, + "n_missing": 0, + "p_missing": 0, + "p_distinct": 2 / 9, + }, + "bool_tf_with_nan": { + "n": 9, + "count": 9, + "p_missing": 0.0, + "n_distinct": 2, + "n_unique": 0, + "p_distinct": 0.2222222222222222, + "is_unique": False, + "p_unique": 0.0, + }, + "bool_01": { + "n": 9, + "count": 9, + "cv": 0.9486832980505138, + "n_distinct": 2, + "iqr": 1.0, + "is_unique": False, + "mad": 0, + "max": 1, + "min": 0, + "n_missing": 0, + "p_missing": 0, + "p_distinct": 2 / 9, + "p_zeros": 4 / 9, + "sum": 5, + }, + "bool_01_with_nan": { + "n": 9, + "count": 9, + "p_missing": 0.0, + "n_distinct": 3, + "n_unique": 1, + "p_distinct": 0.3333333333333333, + "is_unique": False, + "p_unique": 0.1111111111111111, + "n_infinite": 0, + "p_infinite": 0.0, + "n_zeros": 4, + "p_zeros": 0.4444444444444444, + "n_negative": 0, + "p_negative": 0.0, + "5%": 0.0, + "25%": 0.0, + "50%": 0.0, + "75%": 1.0, + "95%": 1.0, + "mad": 0.0, + "min": 0.0, + "max": check_is_NaN, + "mean": check_is_NaN, + "std": check_is_NaN, + "variance": check_is_NaN, + "kurtosis": check_is_NaN, + "skewness": check_is_NaN, + "sum": check_is_NaN, + "range": check_is_NaN, + "iqr": 1.0, + "cv": check_is_NaN, + }, + "list": { + "n": 9, + "count": 9, + "n_missing": 0, + "p_missing": 0, + }, + "mixed": { + "n": 9, + "count": 9, + "n_missing": 0, + "p_missing": 0, + }, + "dict": {}, + } + + +@pytest.mark.sparktest +@pytest.mark.parametrize( + "column", + [ + "id", + "x", + "y", + "cat", + "s1", + "s2", + "somedate", + "bool_tf", + "bool_tf_with_nan", + "bool_01", + "bool_01_with_nan", + "list", + "mixed", + "dict", + ], +) +def test_describe_spark_df( + column, + describe_data, + expected_results, + summarizer, + typeset, + spark_session, +): + + cfg = SparkSettings() + + # disable correlations for description test + cfg.correlations["pearson"].calculate = False + cfg.correlations["spearman"].calculate = False + + if column == "mixed": + describe_data[column] = [str(i) for i in describe_data[column]] + if column == "bool_tf_with_nan": + describe_data[column] = [True if i else False for i in describe_data[column]] + sdf = spark_session.createDataFrame(pd.DataFrame({column: describe_data[column]})) + + results = describe(cfg, sdf, summarizer, typeset) + + assert { + "analysis", + "table", + "variables", + "scatter", + "correlations", + "missing", + "package", + "sample", + "duplicates", + "alerts", + } == set(results.keys()), "Not in results" + # Loop over variables + for k, v in expected_results[column].items(): + if v == check_is_NaN: + # test_condition should be True if column not in results, or the result is a nan value + test_condition = k not in results["variables"][column] or pd.isna( + results["variables"][column].get(k, np.NaN) + ) + elif isinstance(v, float): + test_condition = ( + pytest.approx(v, nan_ok=True) == results["variables"][column][k] + ) + else: + test_condition = v == results["variables"][column][k] + + assert ( + test_condition + ), f"Value `{results['variables'][column][k]}` for key `{k}` in column `{column}` is not check_is_NaN" diff --git a/tests/backends/spark_backend/test_duplicates.py b/tests/backends/spark_backend/test_duplicates.py new file mode 100644 index 000000000..019bc92fc --- /dev/null +++ b/tests/backends/spark_backend/test_duplicates.py @@ -0,0 +1,39 @@ +import pandas as pd +import pytest + +from pandas_profiling.config import Settings +from pandas_profiling.model.spark.duplicates_spark import spark_get_duplicates + + +@pytest.fixture +def duplicates_data(spark_session): + correlation_testdata = pd.DataFrame( + { + "test_num_1": [1, 2, 3, 5, 7, 8, 9, 1], + "test_num_2": [11, 12, 13, 15, 17, 18, 4, 11], + } + ) + + return spark_session.createDataFrame(correlation_testdata) + + +def test_spark_get_duplicates_disabled(duplicates_data): + cfg = Settings() + cfg.duplicates.head = 0 + + stats, df = spark_get_duplicates(cfg, duplicates_data, duplicates_data.columns) + assert "n_duplicates" not in stats + assert df is None + + +def test_spark_get_duplicates(duplicates_data): + cfg = Settings() + cfg.duplicates.head = 3 + cfg.duplicates.key = "my_name" + + stats, df = spark_get_duplicates(cfg, duplicates_data, duplicates_data.columns) + assert stats["n_duplicates"] == 1 + assert df.head(1)["my_name"][0] == 2 + assert df.head(1).test_num_1[0] == 1 + assert df.head(1).test_num_2[0] == 11 + assert "count" not in df.head(1) diff --git a/tests/backends/spark_backend/test_missing_spark.py b/tests/backends/spark_backend/test_missing_spark.py new file mode 100644 index 000000000..c0c2d79ef --- /dev/null +++ b/tests/backends/spark_backend/test_missing_spark.py @@ -0,0 +1,28 @@ +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest + +from pandas_profiling.config import Settings +from pandas_profiling.model.spark.missing_spark import spark_missing_bar + + +@pytest.fixture +def missing_data(spark_session): + missing_testdata = pd.DataFrame( + { + "test_num_1": [1, np.nan, 3, 5, 7, 8, np.nan, 1], + "test_num_2": [11, np.nan, 13, 15, 17, 18, 4, 11], + "test_num_3": [11, np.nan, 13, 15, 17, 18, 4, 11], + } + ) + + return spark_session.createDataFrame(missing_testdata) + + +def test_spark_missing_bar(missing_data): + cfg = Settings() + a = spark_missing_bar(cfg, missing_data) + + Path("test.svg").write_text(a) diff --git a/tests/backends/spark_backend/test_report_spark.py b/tests/backends/spark_backend/test_report_spark.py new file mode 100644 index 000000000..a130163d5 --- /dev/null +++ b/tests/backends/spark_backend/test_report_spark.py @@ -0,0 +1,23 @@ +import pandas as pd +import pytest + +from pandas_profiling import ProfileReport + + +@pytest.fixture +def correlation_data_num(spark_session): + correlation_testdata = pd.DataFrame( + { + "test_num_1": [1, 2, 3, 5, 7, 8, 9], + "test_num_2": [11, 12, 13, 15, 17, 18, 4], + } + ) + + return spark_session.createDataFrame(correlation_testdata) + + +def test_report_spark(correlation_data_num): + + a = ProfileReport(correlation_data_num) + + a.to_file("test.html", silent=False) diff --git a/tests/backends/spark_backend/test_sample_spark.py b/tests/backends/spark_backend/test_sample_spark.py new file mode 100644 index 000000000..db753432f --- /dev/null +++ b/tests/backends/spark_backend/test_sample_spark.py @@ -0,0 +1,67 @@ +import pandas as pd +import pytest +from pyspark.sql.types import IntegerType, StringType, StructField, StructType + +from pandas_profiling.config import Settings +from pandas_profiling.model.spark.sample_spark import spark_get_sample + + +# FIXME: Move to data +@pytest.fixture() +def df(spark_session): + data_pandas = pd.DataFrame( + { + "make": ["Jaguar", "MG", "MINI", "Rover", "Lotus"] * 50, + "registration": ["AB98ABCD", "BC99BCDF", "CD00CDE", "DE01DEF", "EF02EFG"] + * 50, + "year": [1998, 1999, 2000, 2001, 2002] * 50, + } + ) + # Turn the data into a Spark DataFrame, self.spark comes from our PySparkTest base class + data_spark = spark_session.createDataFrame(data_pandas) + return data_spark + + +@pytest.fixture() +def df_empty(spark_session): + data_pandas = pd.DataFrame({"make": [], "registration": [], "year": []}) + # Turn the data into a Spark DataFrame, self.spark comes from our PySparkTest base class + schema = StructType( + { + StructField("make", StringType(), True), + StructField("registration", StringType(), True), + StructField("year", IntegerType(), True), + } + ) + data_spark = spark_session.createDataFrame(data_pandas, schema=schema) + return data_spark + + +def test_spark_get_sample(df): + config = Settings() + config.samples.head = 17 + config.samples.random = 0 + config.samples.tail = 0 + + res = spark_get_sample(config, df) + assert len(res) == 1 + assert res[0].id == "head" + assert len(res[0].data) == 17 + + config = Settings() + config.samples.head = 0 + config.samples.random = 0 + config.samples.tail = 0 + + res = spark_get_sample(config, df) + assert len(res) == 0 + + +def test_spark_sample_empty(df_empty): + config = Settings() + config.samples.head = 5 + config.samples.random = 0 + config.samples.tail = 0 + + res = spark_get_sample(config, df_empty) + assert len(res) == 0 diff --git a/tests/unit/test_describe.py b/tests/unit/test_describe.py index 82f6966a5..a6eacbbc5 100644 --- a/tests/unit/test_describe.py +++ b/tests/unit/test_describe.py @@ -323,7 +323,6 @@ def expected_results(): "std": 0, "sum": 9, "variance": 0.0, - "mode": 1.0, "monotonic_increase": True, "monotonic_increase_strict": False, }, diff --git a/venv/spark.yml b/venv/spark.yml new file mode 100644 index 000000000..f056e820f --- /dev/null +++ b/venv/spark.yml @@ -0,0 +1,23 @@ +name: spark-env + +channels: + - conda-forge + - defaults + +dependencies: + - python=3.9 + - ipykernel + - nb_conda + - jupyterlab + - jupyterlab_code_formatter + - isort + - black + - pyspark=3.2.0 + - pip + - matplotlib + - pip: + - delta-spark==1.2.1 + - -r requirements-dev.txt + - -r requirements-pandas.txt + - -r requirements.txt + - -r requirements-spark.txt