Skip to content

Commit

Permalink
HdfsUtils using kerberos (#1655)
Browse files Browse the repository at this point in the history
  • Loading branch information
anistal authored and stratiocommit committed Nov 2, 2016
1 parent 5edcb8c commit 6ef2d37
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 16 deletions.
2 changes: 2 additions & 0 deletions dist/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ sparta {
pluginsFolder = jars
executionJarFolder = jarDriver
classpathFolder = classpath
principalName = ""
keytabPath = ""
}

mesos {
Expand Down
2 changes: 2 additions & 0 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ MESOS_MASTER: Only needed if EXECUTION_MODE is 'mesos'. Indicates the ip or fqdn
HDFS_MASTER: Only needed if EXECUTION_MODE is 'mesos' or 'yarn'. Indicates the ip or fqdn of the Yarn master host.
HDFS_PORT: Only needed if EXECUTION_MODE is 'mesos' or 'yarn'. Indicates the port of the Yarn master host. (i.e. 8020)
HDFS_USER_NAME: Only needed if EXECUTION_MODE is 'mesos' or 'yarn'. Indicates the hadoop user name. (i.e. stratio)
HDFS_PRINCIPAL_NAME: Integration with Kerberos: this variable is used to identify as an user Sparta with Kerberos
HDFS_KEYTAB: Integration with Kerberos: this variable is used to specify the path of the keytab.
SPARK_VERSION: Only needed if EXECUTION_MODE is 'mesos', 'yarn' or 'standalone'. Indicates the spark version. (i.e. spark-1.6.2)
HADOOP_SPARK_VERSION: Only needed if EXECUTION_MODE is 'mesos', 'yarn' or 'standalone'. Indicates the compiled hadoop spark version. (i.e. hadoop2.6)
HADOOP_VERSION: Only needed if EXECUTION_MODE is 'mesos', 'yarn' or 'standalone'. Indicates the hadoop version. (i.e. hadoop-2.7.1)
Expand Down
13 changes: 12 additions & 1 deletion docker/docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,31 @@
if [[ ! -v HDFS_PORT ]]; then
HDFS_PORT=8020
fi
if [[ ! -v HDFS_PRINCIPAL_NAME ]]; then
HDFS_PRINCIPAL_NAME=""
fi
if [[ ! -v HDFS_KEYTAB ]]; then
HDFS_KEYTAB=""
fi
if [[ ! -v HDFS_USER_NAME ]]; then
HDFS_USER_NAME=stratio
fi
if [[ ! -v SPARK_MASTER ]]; then
SPARK_MASTER="local[*]"
fi
if [[ ! -v MESOS_MASTER ]]; then
if [[ ! -v MESOS_MASTER ]]; then
MESOS_MASTER=localhost:7077
fi


sed -i "s|executionMode.*|executionMode = \"${EXECUTION_MODE}\"|" ${SPARTA_CONF_FILE}
sed -i "s|connectionString.*|connectionString = \""${ZOOKEEPER_HOST}"\"|" ${SPARTA_CONF_FILE}
sed -i "s|hdfsMaster.*|hdfsMaster = \"${HDFS_MASTER}\"|" ${SPARTA_CONF_FILE}
sed -i "s|hdfsPort.*|hdfsPort = \"${HDFS_PORT}\"|" ${SPARTA_CONF_FILE}
sed -i "s|hadoopUserName.*|hadoopUserName = \"${HDFS_USER_NAME}\"|" ${SPARTA_CONF_FILE}
sed -i "s|principalName.*|principalName = \"${HDFS_PRINCIPAL_NAME}\"|" ${SPARTA_CONF_FILE}
sed -i "s|keytabPath.*|keytabPath = \"${HDFS_KEYTAB}\"|" ${SPARTA_CONF_FILE}

if [[ ! -v EXECUTION_MODE || "${EXECUTION_MODE}" == "local" ]]; then
sed -i "s|spark.master.*|spark.master = \"${SPARK_MASTER}\"|" ${SPARTA_CONF_FILE}
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@
*/
package com.stratio.sparta.driver.util

import java.io.BufferedInputStream
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
import java.io._
import java.security.PrivilegedExceptionAction

import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.deploy.SparkHadoopUtil

import scala.util.Try

import akka.event.slf4j.SLF4JLogging
import com.typesafe.config.Config
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FSDataOutputStream, FileStatus, FileSystem, Path}

case class HdfsUtils(dfs: FileSystem, userName: String) {
case class HdfsUtils(dfs: FileSystem, userName: String, ugiOption: Option[UserGroupInformation] = None) {

def getFiles(path: String): Array[FileStatus] = dfs.listStatus(new Path(path))

Expand All @@ -40,7 +39,18 @@ case class HdfsUtils(dfs: FileSystem, userName: String) {

def write(path: String, destPath: String, overwrite: Boolean = false): Int = {
val file = new File(path)
val out = dfs.create(new Path(s"$destPath${file.getName}"))

val out = ugiOption match {
case Some(ugi) =>
ugi.doAs(new PrivilegedExceptionAction[FSDataOutputStream]() {
override def run(): FSDataOutputStream = {
dfs.create(new Path(s"$destPath${file.getName}"))
}
})
case None =>
dfs.create(new Path(s"$destPath${file.getName}"))
}

val in = new BufferedInputStream(new FileInputStream(file))
val bytesCopied = Try(IOUtils.copy(in, out))
IOUtils.closeQuietly(in)
Expand All @@ -65,8 +75,12 @@ object HdfsUtils extends SLF4JLogging {
new Configuration()
)

def apply(user: String, conf: Configuration): HdfsUtils = {
Option(System.getenv("HADOOP_CONF_DIR")).foreach(hadoopConfDir => {
def apply(user: String,
conf: Configuration,
principalNameOption: Option[String],
keytabPathOption: Option[String]): HdfsUtils = {
Option(System.getenv("HADOOP_CONF_DIR")).foreach(
hadoopConfDir => {
val hdfsCoreSitePath = new Path(s"$hadoopConfDir/core-site.xml")
val hdfsHDFSSitePath = new Path(s"$hadoopConfDir/hdfs-site.xml")
val yarnSitePath = new Path(s"$hadoopConfDir/yarn-site.xml")
Expand All @@ -77,15 +91,26 @@ object HdfsUtils extends SLF4JLogging {
}
)

log.debug(s"Configuring HDFS with master: ${conf.get(DefaultFSProperty)} and user: $user")
val defaultUri = FileSystem.getDefaultUri(conf)
val ugi =
if(principalNameOption.isDefined && keytabPathOption.isDefined) {
val principalName = principalNameOption.getOrElse(
throw new IllegalStateException("principalName can not be null"))
val keytabPath = keytabPathOption.getOrElse(
throw new IllegalStateException("keytabPathOption can not be null"))

new HdfsUtils(FileSystem.get(defaultUri, conf, user), user)
UserGroupInformation.setConfiguration(conf)
Option(UserGroupInformation.loginUserFromKeytabAndReturnUGI(principalName, keytabPath))
} else None

new HdfsUtils(FileSystem.get(conf), user, ugi)
}

def apply(config: Option[Config]): HdfsUtils = {
val user = config.map(_.getString("hadoopUserName")).getOrElse("stratio")
apply(user, hdfsConfiguration(config))
val principalName: Option[String] =
Try(config.get.getString("principalName")).toOption.flatMap(x => if(x == "") None else Some(x))
val keytabPath: Option[String] =
Try(config.get.getString("keytabPath")).toOption.flatMap(x => if(x == "") None else Some(x))
apply(user, hdfsConfiguration(config), principalName, keytabPath)
}
}

0 comments on commit 6ef2d37

Please sign in to comment.