PyDeequ is a python wrapper around Deequ, which is a "unit testing" framework for data and written in Scala. The main motivation of PyDeequ is to enable data science projects to discover the quality of their input data without leaving the python based environment. Deequ is built on top of Apache Spark, therefore PyDeequ utilizes the PySpark API of the Apache Spark project.
The design of PyDeequ API aims to follow the Fluent API concept of the Deequ Scala project. There are four implemented interfaces of Deequ, BUT in many cases they are implemented only with their default parameters in PyDeequ. In order to improve the API coverage we are happy to receive feedback and contributions to this project.
For further details on the Deequ API design and examples please refer to the following resources:
PyDeequ is tested on Spark 2.3.0 and Deequ 1.0.3-rc2. You need to initialize your pyspark session with the Deequ jar package. The .jar file is available via maven central, download it with
wget https://repo1.maven.org/maven2/com/amazon/deequ/deequ/1.0.3-rc2/deequ-1.0.3-rc2.jar
and start pyspark with
pyspark --jars ./deequ-1.0.3-rc2.jar
Currently PyDeequ only implements the basic functionality of Deequ but hopefully it still brings some value to a python based data science projects. In the followings we demonstrate the basic usage of these four functionalities. There are also example files available in src\pydeequ\examples
.
The main components of Deequ are
- Analyzers: main metrics computation engine of Deequ, they output descriptive statistics on tabular input data,
- Constrain verification: predefined data quality checks with threshold values as parameters, they based on metrics computed by analyzers,
- Constrain suggestions: automated constrain generation based on a set of rules, which profile the data first to come up with useful constrains.
There is a test dataset available in the examples
what we use in the followings to run demos.
>>> from pydeequ.examples import test_data
>>> df = spark.createDataFrame(test_data)
>>> df.show()
+------+----+----------+-----+----+
| _1| _2| _3| _4| _5|
+------+----+----------+-----+----+
|thingA|13.0|IN_TRANSIT| true| 5.0|
|thingA| 5.0| DELAYED|false|20.0|
|thingB|null| DELAYED| null|12.0|
|thingC|null|IN_TRANSIT|false| 2.0|
|thingD| 1.0| DELAYED| true|null|
|thingC| 7.0| UNKNOWN| null|null|
|thingC|20.0| UNKNOWN| null| 3.5|
|thingE|20.0| DELAYED|false| 8.2|
+------+----+----------+-----+----+
>>>
The main entry point for the end-user is the VerificationSuite. We can add data and checks to the suite following chained method calls. Majority of the checks take column(s) as input and an assertion function, which takes a metric value as input and outputs a boolean value, indicating whether the constrain met or not. The list of these pre-defined checks are available here.
>>> from pydeequ.base import VerificationSuite
>>> from pydeequ.checks import Check
>>> from pyspark.sql import DataFrame
>>> r = (VerificationSuite(spark)
... .onData(df)
... .addCheck(Check(spark, 'error', 'examples')
... .hasSize(lambda x: x == 8)
... .isUnique('_2')
... .hasCompleteness('_2', lambda x: x >= 0.75)
... .hasCorrelation('_2', '_5', lambda x: abs(x) > 0.5)
... )) \
... .run()
>>> out = DataFrame(r, spark)
>>> out.show()
+--------+-----------+------------+--------------------+-----------------+--------------------+
| check|check_level|check_status| constraint|constraint_status| constraint_message|
+--------+-----------+------------+--------------------+-----------------+--------------------+
|examples| Error| Error|SizeConstraint(Si...| Success| |
|examples| Error| Error|UniquenessConstra...| Failure|Value: 0.66666666...|
|examples| Error| Error|CompletenessConst...| Success| |
|examples| Error| Error|CorrelationConstr...| Success| |
+--------+-----------+------------+--------------------+-----------------+--------------------+
>>>
>>> spark.sparkContext._gateway.close()
The output of data quality verification is presented as a Spark DataFrame. In the example we performed three checks on the test_data
:
- hasSize: checks the number of rows in the dataset and raises error if it is NOT equal 8
- isUnique: checks column
_2
values, raises error as only 4 values (13, 5, 1, 7) are unique from the 6 distinct values - hasCompleteness: checks whether at least 75% of the column
_2
values are populated - hasCorrelation: checks the correlation between two numerical columns
More examples on verification checks can be found here:
python -m pydeequ.examples.basic_usage
Note that the VerificationSuite starts a JVM callback server what we need to close at the end of the session.
We can calculate some statistics on the dataset with the AnalysisRunner API. As you see on the architecture diagram above, the metrics are atomic elements of data "unit testing" in Deequ. NOTE: Metrics can be stored in repositories but currently this API is not implemented in PyDeequ.
>>> from pyspark.sql import DataFrame
>>> from pydeequ.base import AnalysisRunner
>>> import pydeequ.analyzers as analyzers
>>> r = (AnalysisRunner(spark)
... .onData(df)
... .addAnalyzer(analyzers.Size())
... .addAnalyzer(analyzers.Completeness('_3'))
... .addAnalyzer(analyzers.ApproxCountDistinct('_1'))
... .addAnalyzer(analyzers.Mean('_2'))
... .addAnalyzer(analyzers.Compliance('top values', '_2 > 15'))
... .addAnalyzer(analyzers.Correlation('_2', '_5'))) \
... .run()
>>> out = DataFrame(r, spark)
>>> out.show()
+-----------+----------+-------------------+-------------------+
| entity| instance| name| value|
+-----------+----------+-------------------+-------------------+
| Column| _1|ApproxCountDistinct| 5.0|
| Dataset| *| Size| 8.0|
| Column| _3| Completeness| 1.0|
|Mutlicolumn| _2,_5| Correlation|-0.8310775272166866|
| Column|top values| Compliance| 0.25|
| Column| _2| Mean| 11.0|
+-----------+----------+-------------------+-------------------+
>>>
Metrics are presented as a Spark DataFrame in PyDeequ.
Provides column statistics depending on the datatype:
>>> import json
>>> from pydeequ.profiler import ColumnProfilerRunner
>>> r = (ColumnProfilerRunner()
... .onData(df)
... .run())
>>> parsed = json.loads(r)
>>> print(json.dumps(parsed, indent = 4))
{
"columns": [
{
"column": "_3",
"dataType": "String",
"isDataTypeInferred": "true",
"completeness": 1.0,
"approximateNumDistinctValues": 3,
"histogram": [
{
"value": "DELAYED",
"count": 4,
"ratio": 0.5
},
{
"value": "UNKNOWN",
"count": 2,
"ratio": 0.25
},
{
"value": "IN_TRANSIT",
"count": 2,
"ratio": 0.25
}
]
},
{
"column": "_2",
[...]
The output of column profiling is presented in JSON format.
It might be time consuming to define relevant data quality constraints during the data engineering process. The automated constrain suggestion in Deequ provide a useful tool to speed up this process.
>>> import json
>>> from pydeequ.suggestions import ConstraintSuggestionRunner, Rules
>>> r = (ConstraintSuggestionRunner(spark)
... .onData(df)
... .addConstraintRule(Rules.CategoricalRangeRule(spark))
... .run())
>>> parsed = json.loads(r)
>>> print(json.dumps(parsed, indent = 4))
{
"constraint_suggestions": [
{
"constraint_name": "ComplianceConstraint(Compliance('_3' has value range 'DELAYED', 'IN_TRANSIT', 'UNKNOWN',`_3` IN ('DELAYED', 'IN_TRANSIT', 'UNKNOWN')
,None))",
"column_name": "_3",
"current_value": "Compliance: 1",
"description": "'_3' has value range 'DELAYED', 'IN_TRANSIT', 'UNKNOWN'",
"suggesting_rule": "CategoricalRangeRule()",
"rule_description": "If we see a categorical range for a column, we suggest an IS IN (...) constraint",
"code_for_constraint": ".isContainedIn(\"_3\", Array(\"DELAYED\", \"IN_TRANSIT\", \"UNKNOWN\"))"
}
]
}
>>>
The output is presented in JSON format. Constrain suggestions are based on constrain rules, the set of available rules can be found here.
The PyDeequ library is licensed under the Apache 2.0 license.