Skip to content

Commit

Permalink
[SPARK-3237][SQL] Fix parquet filters with UDFs
Browse files Browse the repository at this point in the history
Author: Michael Armbrust <[email protected]>

Closes apache#2153 from marmbrus/parquetFilters and squashes the following commits:

712731a [Michael Armbrust] Use closure serializer for sending filters.
1e83f80 [Michael Armbrust] Clean udf functions.
  • Loading branch information
marmbrus committed Aug 27, 2014
1 parent 3e2864e commit e1139dd
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.types.DataType
import org.apache.spark.util.ClosureCleaner

case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression])
extends Expression {

// Clean function when not called with default no-arg constructor.
if (function != null) { ClosureCleaner.clean(function) }

type EvaluatedType = Any

def nullable = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.parquet

import java.nio.ByteBuffer

import org.apache.hadoop.conf.Configuration

import parquet.filter._
Expand All @@ -25,6 +27,7 @@ import parquet.column.ColumnReader

import com.google.common.io.BaseEncoding

import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate}
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -237,7 +240,8 @@ object ParquetFilters {
*/
def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = {
if (filters.length > 0) {
val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters)
val serialized: Array[Byte] =
SparkEnv.get.closureSerializer.newInstance().serialize(filters).array()
val encoded: String = BaseEncoding.base64().encode(serialized)
conf.set(PARQUET_FILTER_DATA, encoded)
}
Expand All @@ -252,7 +256,7 @@ object ParquetFilters {
val data = conf.get(PARQUET_FILTER_DATA)
if (data != null) {
val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
SparkSqlSerializer.deserialize(decoded)
SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded))
} else {
Seq()
}
Expand Down

0 comments on commit e1139dd

Please sign in to comment.