Skip to content

Commit

Permalink
Move main Support to Spark 3.2
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Nov 30, 2021
1 parent 29d023e commit 782847d
Show file tree
Hide file tree
Showing 17 changed files with 89 additions and 13 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ version := "0.2.0"

scalaVersion := "2.12.10"

val sparkVersion = "3.1.0"
val sparkVersion = "3.2.0"

libraryDependencies += "org.apache.spark" %% s"spark-sql" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" %% s"spark-core" % sparkVersion % "provided"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExpressionDescription, If, ImplicitCastInputTypes, IsNull, Literal, Or}
import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.trees.BinaryLike
import org.apache.spark.sql.extra.{ExpressionUtils, FunctionDescription}
import org.apache.spark.sql.types.{AbstractDataType, DataType, DoubleType, LongType}

Expand All @@ -44,8 +45,8 @@ import org.apache.spark.sql.types.{AbstractDataType, DataType, DoubleType, LongT
note = "",
group = "agg_funcs")
// scalastyle:on line.size.limit
case class RegrCount(y: Expression, x: Expression)
extends DeclarativeAggregate with ImplicitCastInputTypes {
case class RegrCount(left: Expression, right: Expression)
extends DeclarativeAggregate with ImplicitCastInputTypes with BinaryLike[Expression] {
override def prettyName: String = "regr_count"
private lazy val regrCount = AttributeReference(prettyName, LongType, nullable = false)()

Expand All @@ -71,7 +72,10 @@ case class RegrCount(y: Expression, x: Expression)

override def dataType: DataType = LongType

override def children: Seq[Expression] = Seq(y, x)
override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): Expression = {
copy(left = newLeft, right = newRight)
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ case class Age(end: Expression, start: Expression)
}

override def dataType: DataType = CalendarIntervalType

override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): Expression = {
copy(end = newLeft, start = newRight)
}
}

object Age {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ case class ArrayAppend(left: Expression, right: Expression) extends BinaryExpres
}

override def prettyName: String = "array_append"

override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): Expression =
copy(left = newLeft, right = newRight)
}

object ArrayAppend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ case class ArrayLength(left: Expression, right: Expression)
override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, IntegerType)

override def prettyName: String = "array_length"

override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): Expression =
copy(left = newLeft, right = newRight)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ case class Scale(child: Expression) extends UnaryExpression with ImplicitCastInp

override def inputTypes: Seq[AbstractDataType] =
Seq(TypeCollection(DoubleType, DecimalType, IntegralType))

override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(child = newChild)
}
}

object Scale {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ case class SplitPart(text: Expression, delimiter: Expression, field: Expression)

override def prettyName: String = "split_part"

override def children: Seq[Expression] = text :: delimiter :: field :: Nil

override def inputTypes: Seq[AbstractDataType] = StringType :: StringType :: IntegerType :: Nil

override def dataType: DataType = StringType
Expand All @@ -62,6 +60,17 @@ case class SplitPart(text: Expression, delimiter: Expression, field: Expression)
strings(index - 1)
}
}

override def first: Expression = text

override def second: Expression = delimiter

override def third: Expression = field

override protected def withNewChildrenInternal(
newFirst: Expression, newSecond: Expression, newThird: Expression): Expression = {
copy(text = newFirst, delimiter = newSecond, field = newThird)
}
}

object SplitPart {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ case class StringToArray(text: Expression, delimiter: Expression, replaced: Expr

override def prettyName: String = "string_to_array"

override def children: Seq[Expression] = Seq(text, delimiter, replaced)

override def inputTypes: Seq[AbstractDataType] = Seq(StringType, StringType, StringType)

override def dataType: DataType = ArrayType(StringType, containsNull = true)
Expand Down Expand Up @@ -76,6 +74,17 @@ case class StringToArray(text: Expression, delimiter: Expression, replaced: Expr
val inited = if (UTF8String.EMPTY_UTF8.equals(deli)) strings.init else strings
new GenericArrayData(inited.asInstanceOf[Array[Any]])
}

override def first: Expression = text

override def second: Expression = delimiter

override def third: Expression = replaced

override protected def withNewChildrenInternal(
newFirst: Expression, newSecond: Expression, newThird: Expression): Expression = {
copy(text = newFirst, delimiter = newSecond, replaced = newThird)
}
}

object StringToArray {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ case class UnNest(child: Expression) extends UnaryExpression with Generator with
rows
}
}

override protected def withNewChildInternal(newChild: Expression): Expression =
copy(child = newChild)
}

object UnNest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ abstract class IntervalJustifyLike(
""",
since = "0.1.0")
case class JustifyDays(child: Expression)
extends IntervalJustifyLike(child, justifyDays, "justifyDays")
extends IntervalJustifyLike(child, justifyDays, "justifyDays") {
override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(child = newChild)
}
}

@ExpressionDescription(
usage = "_FUNC_(expr) - Adjust interval so 24-hour time periods are represented as days",
Expand All @@ -79,7 +83,11 @@ case class JustifyDays(child: Expression)
""",
since = "0.1.0")
case class JustifyHours(child: Expression)
extends IntervalJustifyLike(child, justifyHours, "justifyHours")
extends IntervalJustifyLike(child, justifyHours, "justifyHours") {
override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(child = newChild)
}
}

@ExpressionDescription(
usage = "_FUNC_(expr) - Adjust interval using justifyHours and justifyDays, with additional" +
Expand All @@ -91,7 +99,11 @@ case class JustifyHours(child: Expression)
""",
since = "0.1.0")
case class JustifyInterval(child: Expression)
extends IntervalJustifyLike(child, justifyInterval, "justifyInterval")
extends IntervalJustifyLike(child, justifyInterval, "justifyInterval") {
override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(child = newChild)
}
}


object IntervalJustifyLike {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ case class Char2HexInt(
override def dataType: DataType = StringType

override def prettyName: String = "CHAR2HEXINT"

override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(child = newChild)
}
}

object Char2HexInt {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ case class CosineSimilarity(
override def dataType: DataType = DoubleType

override def prettyName: String = "cosine_similarity"

override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): Expression =
copy(left = newLeft, right = newRight)
}

object CosineSimilarity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ case class IsFinite(child: Expression) extends UnaryExpression
}

override def prettyName: String = "is_infinite"

override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(child = newChild)
}
}

object IsFinite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ case class IsInfinite(child: Expression) extends UnaryExpression
}

override def prettyName: String = "is_infinite"

override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(child = newChild)
}

}

object IsInfinite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ case class TryExpression(child: Expression) extends UnaryExpression {
|}
|""".stripMargin)
}

override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(child = newChild)
}
}

object TryExpression {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ object ExpressionUtils {
ed.note(),
"array_funcs", // meaningless
ed.since(),
ed.deprecated())
ed.deprecated(),
"built-in") // meaningless
} else {
new ExpressionInfo(
exprClz.getSimpleName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class TeradataExtensionsTest extends SparkSessionHelper {
spark.sql(
"create table abcde using parquet as select cast(a as string)," +
" b from values (interval 1 day , 2)," +
" (interval 2 day, 3), (interval 6 month, 0) t(a, b)")
" (interval 2 day, 3), (interval 5 day, 0) t(a, b)")
assert(spark.sql("select try(cast(a as interval) / b) from abcde where b = 0")
.head().isNullAt(0))
}
Expand Down

0 comments on commit 782847d

Please sign in to comment.