Skip to content

Commit

Permalink
[LIVY-538] Add integration tests for thriftserver
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Added a new module containing ITs for the thriftserver. The current tests contain only simple queries which ensure that the thriftserver is working correctly in the distributed environment. Specific checks of functionalities are already tested in the related UTs

## How was this patch tested?

added tests

Author: mark91 <[email protected]>
Author: Marco Gaido <[email protected]>

Closes #131 from mgaido91/LIVY-538.
  • Loading branch information
mgaido91 authored and Marcelo Vanzin committed Jan 3, 2019
1 parent e2d2189 commit bf4056f
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 13 deletions.
80 changes: 80 additions & 0 deletions integration-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
-->
<cluster.spec>default</cluster.spec>
<skipDeploy>true</skipDeploy>
<livy.test.thrift.enabled>false</livy.test.thrift.enabled>
</properties>

<dependencies>
Expand Down Expand Up @@ -258,6 +259,7 @@
<LIVY_TEST>false</LIVY_TEST>
<LIVY_INTEGRATION_TEST>true</LIVY_INTEGRATION_TEST>
<SPARK_HOME>${project.build.directory}/${spark.bin.name}</SPARK_HOME>
<LIVY_TEST_THRIFT_ENABLED>${livy.test.thrift.enabled}</LIVY_TEST_THRIFT_ENABLED>
</environmentVariables>
<systemProperties>
<cluster.spec>${cluster.spec}</cluster.spec>
Expand All @@ -278,5 +280,83 @@
</property>
</activation>
</profile>
<profile>
<id>thriftserver</id>

<properties>
<livy.test.thrift.enabled>true</livy.test.thrift.enabled>
</properties>

<dependencies>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>livy-thriftserver</artifactId>
<version>${project.version}</version>
</dependency>

<!-- needed for testing thriftserver -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-scala-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.basedir}/src/thriftserver/main/scala</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-scala-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>${project.basedir}/src/thriftserver/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ trait Cluster {
def runLivy(): Unit
def stopLivy(): Unit
def livyEndpoint: String
def jdbcEndpoint: Option[String]
def hdfsScratchDir(): Path

def doAsClusterUser[T](task: => T): T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.livy.test.framework

import java.io._
import java.sql.DriverManager
import javax.servlet.http.HttpServletResponse

import scala.concurrent.duration._
Expand Down Expand Up @@ -126,17 +127,25 @@ object MiniYarnMain extends MiniClusterBase {
}

object MiniLivyMain extends MiniClusterBase {
var livyUrl: Option[String] = None

def start(config: MiniClusterConfig, configPath: String): Unit = {
var livyConf = Map(
protected def baseLivyConf(configPath: String): Map[String, String] = {
val baseConf = Map(
LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster",
LivyConf.HEARTBEAT_WATCHDOG_INTERVAL.key -> "1s",
LivyConf.YARN_POLL_INTERVAL.key -> "500ms",
LivyConf.RECOVERY_MODE.key -> "recovery",
LivyConf.RECOVERY_STATE_STORE.key -> "filesystem",
LivyConf.RECOVERY_STATE_STORE_URL.key -> s"file://$configPath/state-store")
val thriftEnabled = sys.env.get("LIVY_TEST_THRIFT_ENABLED")
if (thriftEnabled.nonEmpty && thriftEnabled.forall(_.toBoolean)) {
baseConf + (LivyConf.THRIFT_SERVER_ENABLED.key -> "true")
} else {
baseConf
}
}

def start(config: MiniClusterConfig, configPath: String): Unit = {
var livyConf = baseLivyConf(configPath)

if (Cluster.isRunningOnTravis) {
livyConf ++= Map("livy.server.yarn.app-lookup-timeout" -> "2m")
Expand All @@ -151,7 +160,10 @@ object MiniLivyMain extends MiniClusterBase {
// server. Do it atomically since it's used by MiniCluster to detect when the Livy server
// is up and ready.
eventually(timeout(30 seconds), interval(1 second)) {
val serverUrlConf = Map("livy.server.server-url" -> server.serverUrl())
var serverUrlConf = Map("livy.server.server-url" -> server.serverUrl())
server.getJdbcUrl.foreach { url =>
serverUrlConf += ("livy.server.thrift.jdbc-url" -> url)
}
saveProperties(serverUrlConf, new File(configPath + "/serverUrl.conf"))
}
}
Expand Down Expand Up @@ -180,6 +192,7 @@ class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterU
private var yarn: Option[ProcessInfo] = None
private var livy: Option[ProcessInfo] = None
private var livyUrl: String = _
private var livyThriftJdbcUrl: Option[String] = None
private var _hdfsScrathDir: Path = _

override def configDir(): File = _configDir
Expand Down Expand Up @@ -242,6 +255,7 @@ class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterU

val props = loadProperties(confFile)
livyUrl = props("livy.server.server-url")
livyThriftJdbcUrl = props.get("livy.server.thrift.jdbc-url")

// Wait until Livy server responds.
val httpClient = new AsyncHttpClient()
Expand All @@ -257,11 +271,14 @@ class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterU
assert(livy.isDefined)
livy.foreach(stop)
livyUrl = null
livyThriftJdbcUrl = None
livy = None
}

def livyEndpoint: String = livyUrl

def jdbcEndpoint: Option[String] = livyThriftJdbcUrl

private def mkdir(name: String, parent: File = tempDir): File = {
val dir = new File(parent, name)
if (!dir.exists()) {
Expand All @@ -281,7 +298,6 @@ class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterU
// Before starting anything, clean up previous running sessions.
sys.process.Process(s"pkill -f $simpleName") !

val java = sys.props("java.home") + "/bin/java"
val cmd =
Seq(
sys.props("java.home") + "/bin/java",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.test.framework

import java.sql.{Connection, DriverManager, ResultSet}

class BaseThriftIntegrationTestSuite extends BaseIntegrationTestSuite {
private var jdbcUri: String = _

override def beforeAll(): Unit = {
cluster = Cluster.get()
// The JDBC endpoint must contain a valid value
assert(cluster.jdbcEndpoint.isDefined)
jdbcUri = cluster.jdbcEndpoint.get
}

def checkQuery(connection: Connection, query: String)(validate: ResultSet => Unit): Unit = {
val ps = connection.prepareStatement(query)
try {
val rs = ps.executeQuery()
try {
validate(rs)
} finally {
rs.close()
}
} finally {
ps.close()
}
}

def withConnection[T](f: Connection => T): T = {
val connection = DriverManager.getConnection(jdbcUri)
try {
f(connection)
} finally {
connection.close()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.test

import java.sql.Date

import org.apache.livy.test.framework.BaseThriftIntegrationTestSuite

class JdbcIT extends BaseThriftIntegrationTestSuite {
test("basic JDBC test") {
withConnection { c =>
checkQuery(
c, "select 1, 'a', cast(null as int), 1.2345, CAST('2018-08-06' as date)") { resultSet =>
resultSet.next()
assert(resultSet.getInt(1) == 1)
assert(resultSet.getString(2) == "a")
assert(resultSet.getInt(3) == 0)
assert(resultSet.wasNull())
assert(resultSet.getDouble(4) == 1.2345)
assert(resultSet.getDate(5) == Date.valueOf("2018-08-06"))
assert(!resultSet.next())
}

checkQuery(
c, "select cast(null as string), cast(null as decimal), cast(null as double), " +
"cast(null as date), null") { resultSetWithNulls =>
resultSetWithNulls.next()
assert(resultSetWithNulls.getString(1) == null)
assert(resultSetWithNulls.wasNull())
assert(resultSetWithNulls.getBigDecimal(2) == null)
assert(resultSetWithNulls.wasNull())
assert(resultSetWithNulls.getDouble(3) == 0.0)
assert(resultSetWithNulls.wasNull())
assert(resultSetWithNulls.getDate(4) == null)
assert(resultSetWithNulls.wasNull())
assert(resultSetWithNulls.getString(5) == null)
assert(resultSetWithNulls.wasNull())
assert(!resultSetWithNulls.next())
}

checkQuery(
c, "select array(1.5, 2.4, 1.3), struct('a', 1, 1.5), map(1, 'a', 2, 'b')") { resultSet =>
resultSet.next()
assert(resultSet.getString(1) == "[1.5,2.4,1.3]")
assert(resultSet.getString(2) == "{\"col1\":\"a\",\"col2\":1,\"col3\":1.5}")
assert(resultSet.getString(3) == "{1:\"a\",2:\"b\"}")
assert(!resultSet.next())
}
}
}
}
29 changes: 22 additions & 7 deletions server/src/main/scala/org/apache/livy/server/LivyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.livy.server

import java.io.{BufferedInputStream, InputStream}
import java.net.InetAddress
import java.util.concurrent._
import java.util.EnumSet
import javax.servlet._
Expand Down Expand Up @@ -55,6 +56,7 @@ class LivyServer extends Logging {
private var kinitFailCount: Int = 0
private var executor: ScheduledExecutorService = _
private var accessManager: AccessManager = _
private var _thriftServerFactory: Option[ThriftServerFactory] = None

private var ugi: UserGroupInformation = _

Expand Down Expand Up @@ -93,10 +95,8 @@ class LivyServer extends Logging {
livyConf.set(LIVY_SPARK_SCALA_VERSION.key,
sparkScalaVersion(formattedSparkVersion, scalaVersionFromSparkSubmit, livyConf))

val thriftServerFactory = if (livyConf.getBoolean(LivyConf.THRIFT_SERVER_ENABLED)) {
Some(ThriftServerFactory.getInstance)
} else {
None
if (livyConf.getBoolean(LivyConf.THRIFT_SERVER_ENABLED)) {
_thriftServerFactory = Some(ThriftServerFactory.getInstance)
}

if (UserGroupInformation.isSecurityEnabled) {
Expand Down Expand Up @@ -221,7 +221,7 @@ class LivyServer extends Logging {
mount(context, uiServlet, "/ui/*")
mount(context, staticResourceServlet, "/static/*")
mount(context, uiRedirectServlet(basePath + "/ui/"), "/*")
thriftServerFactory.foreach { factory =>
_thriftServerFactory.foreach { factory =>
mount(context, factory.getServlet(basePath), factory.getServletMappings: _*)
}
} else {
Expand Down Expand Up @@ -280,15 +280,15 @@ class LivyServer extends Logging {

server.start()

thriftServerFactory.foreach {
_thriftServerFactory.foreach {
_.start(livyConf, interactiveSessionManager, sessionStore, accessManager)
}

Runtime.getRuntime().addShutdownHook(new Thread("Livy Server Shutdown") {
override def run(): Unit = {
info("Shutting down Livy server.")
server.stop()
thriftServerFactory.foreach(_.stop())
_thriftServerFactory.foreach(_.stop())
}
})

Expand Down Expand Up @@ -355,6 +355,21 @@ class LivyServer extends Logging {
_serverUrl.getOrElse(throw new IllegalStateException("Server not yet started."))
}

/** For ITs only */
def getJdbcUrl: Option[String] = {
_thriftServerFactory.map { _ =>
val additionalUrlParams = if (livyConf.get(THRIFT_TRANSPORT_MODE) == "http") {
"?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice"
} else {
""
}
val host = Option(livyConf.get(THRIFT_BIND_HOST)).getOrElse(
InetAddress.getLocalHost.getHostAddress)
val port = livyConf.getInt(THRIFT_SERVER_PORT)
s"jdbc:hive2://$host:$port$additionalUrlParams"
}
}

private[livy] def testRecovery(livyConf: LivyConf): Unit = {
if (!livyConf.isRunningOnYarn()) {
// If recovery is turned on but we are not running on YARN, quit.
Expand Down

0 comments on commit bf4056f

Please sign in to comment.