From e75d9afb2f282ce79c9fd8bce031287739326a4f Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 17 Dec 2019 11:13:27 +0800 Subject: [PATCH] [SPARK-30094][SQL] Apply current namespace for the single-part table name ### What changes were proposed in this pull request? This PR applies the current namespace for the single-part table name if the current catalog is a non-session catalog. Note that the reason the current namespace is not applied for the session catalog is that the single-part name could be referencing a temp view which doesn't belong to any namespaces. The empty namespace for a table inside the session catalog is resolved by the session catalog implementation. ### Why are the changes needed? It's fixing the following bug where the current namespace is not respected: ``` sql("CREATE TABLE testcat.ns.t USING foo AS SELECT 1 AS id") sql("USE testcat.ns") sql("SHOW CURRENT NAMESPACE").show +-------+---------+ |catalog|namespace| +-------+---------+ |testcat| ns| +-------+---------+ // `t` is not resolved since the current namespace `ns` is not used. sql("DESCRIBE t").show Failed to analyze query: org.apache.spark.sql.AnalysisException: Table not found: t;; ``` ### Does this PR introduce any user-facing change? Yes, the above `DESCRIBE` command will succeed. ### How was this patch tested? Added tests. Closes #26894 from imback82/current_namespace. Authored-by: Terry Kim Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/ResolveCatalogs.scala | 2 +- .../sql/connector/catalog/LookupCatalog.scala | 11 ++++++++++- .../connector/catalog/LookupCatalogSuite.scala | 3 ++- .../catalyst/analysis/ResolveSessionCatalog.scala | 2 +- .../sql/connector/DataSourceV2SQLSuite.scala | 15 +++++++++++++++ .../execution/command/PlanResolutionSuite.scala | 1 + 6 files changed, 30 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 3361173c9962f..4487a9f1b6b8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -232,7 +232,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) object NonSessionCatalogAndTable { def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match { - case CatalogAndIdentifier(catalog, ident) if !isSessionCatalog(catalog) => + case NonSessionCatalogAndIdentifier(catalog, ident) => Some(catalog -> ident.asMultipartIdentifier) case _ => None } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index eaded966d9c7d..080ddf1d027e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -103,7 +103,16 @@ private[sql] trait LookupCatalog extends Logging { def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] = { assert(nameParts.nonEmpty) if (nameParts.length == 1) { - Some((currentCatalog, Identifier.of(Array(), nameParts.head))) + // If the current catalog is session catalog, the current namespace is not used because + // the single-part name could be referencing a temp view, which doesn't belong to any + // namespaces. An empty namespace will be resolved inside the session catalog + // implementation when a relation is looked up. + val ns = if (CatalogV2Util.isSessionCatalog(currentCatalog)) { + Array.empty[String] + } else { + catalogManager.currentNamespace + } + Some((currentCatalog, Identifier.of(ns, nameParts.head))) } else if (nameParts.head.equalsIgnoreCase(globalTempDB)) { // Conceptually global temp views are in a special reserved catalog. However, the v2 catalog // API does not support view yet, and we have to use v1 commands to deal with global temp diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala index bd467ed878454..b2f27e4740cbe 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala @@ -116,10 +116,11 @@ class LookupCatalogWithDefaultSuite extends SparkFunSuite with LookupCatalog wit catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found")) }) when(manager.currentCatalog).thenReturn(catalogs("prod")) + when(manager.currentNamespace).thenReturn(Array.empty[String]) manager } - test("catalog object identifier") { + test("catalog and identifier") { Seq( ("tbl", catalogs("prod"), Seq.empty, "tbl"), ("db.tbl", catalogs("prod"), Seq("db"), "tbl"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 834e99c265e9d..5abde31466d0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -583,7 +583,7 @@ class ResolveSessionCatalog( object SessionCatalogAndTable { def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match { - case CatalogAndIdentifier(catalog, ident) if isSessionCatalog(catalog) => + case SessionCatalogAndIdentifier(catalog, ident) => Some(catalog -> ident.asMultipartIdentifier) case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 15381e05d7250..b55e1b92cf60a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1914,6 +1914,21 @@ class DataSourceV2SQLSuite } } + test("SPARK-30094: current namespace is used during table resolution") { + // unset this config to use the default v2 session catalog. + spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) + + withTable("spark_catalog.t", "testcat.ns.t") { + sql("CREATE TABLE t USING parquet AS SELECT 1") + sql("CREATE TABLE testcat.ns.t USING parquet AS SELECT 2") + + checkAnswer(sql("SELECT * FROM t"), Row(1)) + + sql("USE testcat.ns") + checkAnswer(sql("SELECT * FROM t"), Row(2)) + } + } + private def testV1Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 49cacbea6dd17..4576c4b25bf52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -111,6 +111,7 @@ class PlanResolutionSuite extends AnalysisTest { } }) when(manager.currentCatalog).thenReturn(testCat) + when(manager.currentNamespace).thenReturn(Array.empty[String]) when(manager.v1SessionCatalog).thenReturn(v1SessionCatalog) manager }