Skip to content

Commit

Permalink
[SPARK-30094][SQL] Apply current namespace for the single-part table …
Browse files Browse the repository at this point in the history
…name

### What changes were proposed in this pull request?

This PR applies the current namespace for the single-part table name if the current catalog is a non-session catalog.

Note that the reason the current namespace is not applied for the session catalog is that the single-part name could be referencing a temp view which doesn't belong to any namespaces. The empty namespace for a table inside the session catalog is resolved by the session catalog implementation.

### Why are the changes needed?

It's fixing the following bug where the current namespace is not respected:
```
sql("CREATE TABLE testcat.ns.t USING foo AS SELECT 1 AS id")
sql("USE testcat.ns")
sql("SHOW CURRENT NAMESPACE").show
+-------+---------+
|catalog|namespace|
+-------+---------+
|testcat|       ns|
+-------+---------+

// `t` is not resolved since the current namespace `ns` is not used.
sql("DESCRIBE t").show
Failed to analyze query: org.apache.spark.sql.AnalysisException: Table not found: t;;
```

### Does this PR introduce any user-facing change?

Yes, the above `DESCRIBE` command will succeed.

### How was this patch tested?

Added tests.

Closes apache#26894 from imback82/current_namespace.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
imback82 authored and cloud-fan committed Dec 17, 2019
1 parent 696288f commit e75d9af
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

object NonSessionCatalogAndTable {
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
case CatalogAndIdentifier(catalog, ident) if !isSessionCatalog(catalog) =>
case NonSessionCatalogAndIdentifier(catalog, ident) =>
Some(catalog -> ident.asMultipartIdentifier)
case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,16 @@ private[sql] trait LookupCatalog extends Logging {
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] = {
assert(nameParts.nonEmpty)
if (nameParts.length == 1) {
Some((currentCatalog, Identifier.of(Array(), nameParts.head)))
// If the current catalog is session catalog, the current namespace is not used because
// the single-part name could be referencing a temp view, which doesn't belong to any
// namespaces. An empty namespace will be resolved inside the session catalog
// implementation when a relation is looked up.
val ns = if (CatalogV2Util.isSessionCatalog(currentCatalog)) {
Array.empty[String]
} else {
catalogManager.currentNamespace
}
Some((currentCatalog, Identifier.of(ns, nameParts.head)))
} else if (nameParts.head.equalsIgnoreCase(globalTempDB)) {
// Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
// API does not support view yet, and we have to use v1 commands to deal with global temp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,11 @@ class LookupCatalogWithDefaultSuite extends SparkFunSuite with LookupCatalog wit
catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found"))
})
when(manager.currentCatalog).thenReturn(catalogs("prod"))
when(manager.currentNamespace).thenReturn(Array.empty[String])
manager
}

test("catalog object identifier") {
test("catalog and identifier") {
Seq(
("tbl", catalogs("prod"), Seq.empty, "tbl"),
("db.tbl", catalogs("prod"), Seq("db"), "tbl"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ class ResolveSessionCatalog(

object SessionCatalogAndTable {
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
case CatalogAndIdentifier(catalog, ident) if isSessionCatalog(catalog) =>
case SessionCatalogAndIdentifier(catalog, ident) =>
Some(catalog -> ident.asMultipartIdentifier)
case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1914,6 +1914,21 @@ class DataSourceV2SQLSuite
}
}

test("SPARK-30094: current namespace is used during table resolution") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)

withTable("spark_catalog.t", "testcat.ns.t") {
sql("CREATE TABLE t USING parquet AS SELECT 1")
sql("CREATE TABLE testcat.ns.t USING parquet AS SELECT 2")

checkAnswer(sql("SELECT * FROM t"), Row(1))

sql("USE testcat.ns")
checkAnswer(sql("SELECT * FROM t"), Row(2))
}
}

private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class PlanResolutionSuite extends AnalysisTest {
}
})
when(manager.currentCatalog).thenReturn(testCat)
when(manager.currentNamespace).thenReturn(Array.empty[String])
when(manager.v1SessionCatalog).thenReturn(v1SessionCatalog)
manager
}
Expand Down

0 comments on commit e75d9af

Please sign in to comment.