Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50792][SQL] Format binary data as a binary literal in JDBC. #49452

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
Support all binary comparison for BLOBs on Oracle
Signed-off-by: Xiaoguang Sun <[email protected]>
  • Loading branch information
sunxiaoguang committed Jan 13, 2025
commit 86a1cdacb913867a0d12223e51d851cfbc15da05
Original file line number Diff line number Diff line change
Expand Up @@ -992,12 +992,29 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
withTable(tableName) {
// Create a table with binary column
val binary = "X'123456'"
val lessThanBinary = "X'123455'"
val greaterThanBinary = "X'123457'"

sql(s"CREATE TABLE $tableName (binary_col BINARY)")
sql(s"INSERT INTO $tableName VALUES ($binary)")

val select = s"SELECT * FROM $tableName WHERE binary_col = $binary"
assert(spark.sql(select).collect().length === 1, s"Binary literal test failed: $select")
def testBinaryLiteral(operator: String, literal: String, expected: Int): Unit = {
val sql = s"SELECT * FROM $tableName WHERE binary_col $operator $literal"
assert(spark.sql(sql).collect().length === expected, s"Failed to run $sql")
}

testBinaryLiteral("=", binary, 1)
testBinaryLiteral(">=", binary, 1)
testBinaryLiteral(">=", lessThanBinary, 1)
testBinaryLiteral(">", lessThanBinary, 1)
testBinaryLiteral("<=", binary, 1)
testBinaryLiteral("<=", greaterThanBinary, 1)
testBinaryLiteral("<", greaterThanBinary, 1)
testBinaryLiteral("<>", greaterThanBinary, 1)
testBinaryLiteral("<>", lessThanBinary, 1)
testBinaryLiteral("<=>", binary, 1)
testBinaryLiteral("<=>", lessThanBinary, 0)
testBinaryLiteral("<=>", greaterThanBinary, 0)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the similar test case into JDBCV2Suite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please prepare data at tablePreparation.

Copy link
Author

@sunxiaoguang sunxiaoguang Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tablePreparation

Sorry, I'm not quite familiar with the test infrastructure. In case I make mistakes, let me confirm this question.

To mixin the tablePreparation and dataPreparation from trait defined in V2JDBCTest.scala, we need to update all the integration tests and call the these functions defined in trait.

And duplicate the extra call to multiple integration tests is OK, am I right?

Copy link
Author

@sunxiaoguang sunxiaoguang Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, Just realized I have to use Spark SQL to create table and use the types defined in Spark SQL. If I prepare table and data in tablePreparation and dataPreparation, that will have to be database specific. The code will definitely have to be duplicated for connectors of all the databases.

Copy link
Contributor

@beliefer beliefer Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If we update the basic class JdbcDialect, we should test all the built-in integration tests.
tablePreparation used to customize the DDL, I'm afraid Spark SQL can covers all the built-in integration tests. But you could do your best effort, let's see the result and make the decision.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, just realized each dialect embeds a builder which can override the implementation. Let me have a try.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oracle support is ready for review, PTAL. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can override visitBinaryComparison in OracleSQLBuilder.

Copy link
Author

@sunxiaoguang sunxiaoguang Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are couple threads discussing this topic. Let me copy the comment in case it's missed.

We can only rewrite comparison when one of the arguments is BLOB. For other cases, we have to use existing implementation. But unfortunately, the signature of visitBinaryComparison is accepting everything in string which loss the type information to understand if one of the arguments is binary type.

  protected String visitBinaryComparison(String name, String l, String r) {
    if (name.equals("<=>")) {
      return "((" + l + " IS NOT NULL AND " + r + " IS NOT NULL AND " + l + " = " + r + ") " +
              "OR (" + l + " IS NULL AND " + r + " IS NULL))";
    }
    return l + " " + name + " " + r;
  }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tests passed, but downloading report failed. We can rerun the whole test again to clear all the checks, but it takes quite some time to finish.
https://github.com/sunxiaoguang/spark/actions/runs/12744731853

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,29 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper with N
super.visitAggregateFunction(funcName, isDistinct, inputs)
}

private def generateBlobEquals(lhs: Expression, rhs: Expression): String = {
s"DBMS_LOB.COMPARE(${inputToSQL(lhs)}, ${inputToSQL(rhs)}) = 0"
private def compareBlob(lhs: Expression, operator: String, rhs: Expression): String = {
val l = inputToSQL(lhs)
val r = inputToSQL(rhs)
val op = if (operator == "<=>") "=" else operator
val compare = s"DBMS_LOB.COMPARE($l, $r) $op 0"
if (operator == "<=>") {
s"(($l IS NOT NULL AND $r IS NOT NULL AND $compare) OR ($l IS NULL AND $r IS NULL))"
} else {
compare
}
}

override def build(expr: Expression): String = expr match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just need override visitBinaryComparison

Copy link
Author

@sunxiaoguang sunxiaoguang Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can only rewrite comparison when one of the arguments is BLOB. For other cases, we have to use existing implementation. But unfortunately, the signature of visitBinaryComparison is accepting everything in string which loss the type information to understand if one of the arguments is binary type.

  protected String visitBinaryComparison(String name, String l, String r) {
    if (name.equals("<=>")) {
      return "((" + l + " IS NOT NULL AND " + r + " IS NOT NULL AND " + l + " = " + r + ") " +
              "OR (" + l + " IS NULL AND " + r + " IS NULL))";
    }
    return l + " " + name + " " + r;
  }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tests passed, but downloading report failed. We can rerun the whole test again to clear all the checks, but it takes quite some time to finish.
https://github.com/sunxiaoguang/spark/actions/runs/12744731853

case e: GeneralScalarExpression if e.name == "=" =>
(e.children()(0), e.children()(1)) match {
case (lhs: Literal[_], rhs: Expression) if lhs.dataType == BinaryType =>
generateBlobEquals(rhs, lhs)
case (lhs: Expression, rhs: Literal[_]) if rhs.dataType == BinaryType =>
generateBlobEquals(rhs, lhs)
case e: GeneralScalarExpression =>
e.name() match {
case "=" | "<>" | "<=>" | "<" | "<=" | ">" | ">=" =>
(e.children()(0), e.children()(1)) match {
case (lhs: Literal[_], rhs: Expression) if lhs.dataType == BinaryType =>
compareBlob(lhs, e.name, rhs)
case (lhs: Expression, rhs: Literal[_]) if rhs.dataType == BinaryType =>
compareBlob(lhs, e.name, rhs)
case _ => super.build(expr)
}
case _ => super.build(expr)
}
case _ => super.build(expr)
Expand Down
Loading