Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

[BAHIR-75] [WIP] Remote HDFS connector for Apache Spark using webhdfs protocol with support for Apache Knox #28

Open
wants to merge 26 commits into
base: master
Choose a base branch
from

Conversation

sourav-mazumder
Copy link

This component implements Hadoop File System (org.apache,hadoop.fs.FileSystem) to provide an alternate mechanism (instead of using 'webhdfs or swebhdfs' file uri) for Spark to access (read/write) files from/to a remote Hadoop cluster using webhdfs protocol.

This component takes care of the following requirements related to accessing files (read/write) from/to a remote enterprise Hadoop cluster from a remote Spark cluster-

  1. Support for Apache Knox
  2. Support for passing user id/password different from the user who has started the spark-shell/spark-submit process.
  3. Support for SSL in three modes - Ignoring certificate validation, certificate validation through user supplied trust store path and password, and automatic creation of certificate using openssl and keytool.
  4. Optimized way of getting data from remote HDFS where each connection will get only its part of data.

This component is not a full fledged implementation of Hadoop File System. It implements only those interfaces those are needed by Spark for reading data form remote HDFS and writing back the data to remote HDFS.

Example Usage -

Step 1: Set Hadoop configuration to define a custom uri of your choice and specify the class name BahirWebHdfsFileSystem. For example -
sc.hadoopConfiguration.set("fs.remoteHdfs.impl","org.apache.bahir.datasource.webhdfs.BahirWebHdfsFileSystem").
You can use any name (apart form the standard uris like hdfs, webhdfs, file etc. already used by Spark) instead of 'remoteHdfs'. However subsequently while loading the file (or writing a file) the same should be used.

Step 2: Set the user name and password as below -

val userid = "biadmin"
val password = "password"
val userCred = userid + ":" + password
sc.hadoopConfiguration.set("usrCredStr",userCred)

Step 3 : Now you are ready to load any file from the remote Hadoop cluster using Spark's standard Dataframe/DataSet APIs. For example -

val filePath = "biginsights/spark-enablement/datasets/NewYorkCity311Service/311_Service_Requests_from_2010_to_Present.csv"
val srvr = "ehaasp-577-mastermanager.bi.services.bluemix.net:8443/gateway/default/webhdfs/v1"
val knoxPath = "gateway/default"
val webhdfsPath = "webhdfs/v1"
val prtcl = "remoteHdfs"
val fullPath = s"$prtcl://$srvr/$knoxPath/$webhdfsPath/$filePath"

val df = spark.read.format("csv").option("header", "true").load(fullPath)

Please not the use of 'gateway/default' and 'webhdfs/v1' used for specifying the server specific information in the path. The first one is specific to Apache Knox and the second one is specific for webhdfs protocol.

Step 4; To write data back to remote HDFS following steps can be used (using standard Dataframe writer of spark)

val filePathWrite = "biginsights/spark-enablement/datasets/NewYorkCity311Service/Result.csv"
val srvr = "ehaasp-577-mastermanager.bi.services.bluemix.net:8443"
val knoxPath = "gateway/default"
val webhdfsPath = "webhdfs/v1"
val prtcl = "remoteHdfs"
val fullPath = s"$prtcl://$srvr/$knoxPath/$webhdfsPath/$filePathWrite"

df.write.format("csv").option("header", "true").save(filePathw)

We are still working on followings -

  • Unit Testing
  • Code cleanup
  • Examples showcasing various configuration parameters
  • API documentation

sourav-mazumder and others added 26 commits November 15, 2016 15:36
…ginal hadoop namespace for package private field/method access)
… temporarily disable style checks for println
@ckadner
Copy link
Member

ckadner commented Jan 23, 2017

A few high-level questions before jumping into more detailed code review:

Design

Can you elaborate on differences/limitations/advantages over Hadoop default "webhdfs" scheme? i.e.

  • the main problem you are working around it that the Hadoop WebHdfsFileSystem discards Knox gateway path when creating Http URL (principal motivation for this connector) which makes it impossible to use it with Knox
  • the Hadoop WebHdfsFileSystem implements additional interfaces like:
    • DelegationTokenRenewer.Renewable
    • TokenAspect.TokenManagementDelegator
  • performance differences between your approach vs Hadoop's RemoteFS and WebHDFS

Configuration

Some configuration parameters are specific to remote servers that should be specified by server not on connector level (some at server level may override connector level), i.e.

  • Server level:
    • gateway path (assuming one Knox gateway per server)
    • user name and password
    • authentication method (think Kerberos etc)
  • Connector level:
    • certificate validation options (maybe overridden by server level props)
    • trustStore path
    • webhdfs protocol version (maybe overridden by server level props)
    • buffer sizes and file chunk sizes retry intervals etc

Usability

Given that users need to know about the remote Hadoop server configuration (security, gateway path, etc) for WebHDFS access would it be nicer if ...

  • users could separately configure server specific properties in a config file or registry object and then in Spark jobs only use <protocol>://<server>:<port>/<resourcePath> without having to provide additional properties and not having to type or copy-paste unintuitive path segments like /gateway/default/webhdfs/v1/
  • there should be an API that allows users to specify configuration parameters as opposed to putting them into the hadoopConfiguration object, i.e.
import org.apache.bahir.webhdfs.RemoteHdfsConnector

// initialize connector
RemoteHdfsConnector.initialize(protocol = "remoteHdfs", serverConfigFile = "/servers.xml", 
  truststorePath = "/truststore.jks")

// add another connection configuration, provide optional nickname "bluemix1"
RemoteHdfsConnector.addServerConfiguration(nickname = "bluemix1",
  host = "ehaasp-577-mastermanager.bi.services.bluemix.net", port = "8443",
  gatewayPath = "gateway/default",  // optional, default is "gateway/default"
  userName = "biadmin", password = "*******")

// load remote file using form <protocol>://<nickname>/<resourcePath>
// instead of equivalent longer form <protocol>://<server>:<port>/<resourcePath>
val df = spark.read.format("csv").option("header", "true")
     .load("remoteHdfs://bluemix1/my_spark_datasets/NewYorkCity311Service.csv")

...

Security

  • what authentication methods are supported besides basic auth (i.e. OAuth, Kerberos, ...)
  • should the connector manage auth tokens, token renewal, etc
  • I don't think the connector should create a truststore, either skip certificate validation or take a user provided truststore path (btw, the current code fails to create a truststore on Mac OS X)

Debugging

  • the code should have logging at INFO, DEGUG, ERROR levels using the Spark logging mechanisms (targeting the Spark log files)

Testing

The outstanding unit tests should verify that the connector works with a ...

  • standard Hadoop cluster (unsecured)
  • Hadoop clusters secured by Apache Knox
  • Hadoop clusters secured by other mechanisms like Kerberos

@snowch
Copy link

snowch commented Jan 31, 2017

If I need to provide certificates for ssl verification, would I need to create truststores on worker nodes? On some clusters I work with I do not have access to the filesystem on the worker nodes.

Also, I would like users to be able to specify a path to pem certificates rather than having to make them create a truststore.

Is the following command, creating or reading from servers.xml?

RemoteHdfsConnector.initialize(protocol = "remoteHdfs", serverConfigFile = "/servers.xml", 
  truststorePath = "/truststore.jks")

If it is reading the file, on managed spark environments that don't exist within a hadoop environment will the user have access to this file?

@ckadner
Copy link
Member

ckadner commented Jan 31, 2017

@snowch the code snippet I put under usability in my comment was merely a suggestion for an alternative to using hadoop configuration properties. I had intended the servers.xml file to contain all of the users remote Hadoop connections with host, port, username, password, etc. so that this type of configuration would not have to be done in the Spark program. All configuration files and truststore file would reside on the Spark driver (master node).

In terms of SSL validation, you could opt to by-pass certificate validation.

@sourav-mazumder
Copy link
Author

@ckadner Here goes my response to your comments

Can you elaborate on differences/limitations/advantages over Hadoop default "webhdfs" scheme? i.e.
the main problem you are working around it that the Hadoop WebHdfsFileSystem discards Knox gateway path when creating Http URL (principal motivation for this connector) which makes it impossible to use it with Knox

Yes

the Hadoop WebHdfsFileSystem implements additional interfaces like:
DelegationTokenRenewer.Renewable
TokenAspect.TokenManagementDelegator

This is automatically taken care of by Apache Knox, in my understanding. That is one of the key goals of Apache Knox to relieve hadoop clients from nitigrity of internal security implementation of a hadoop Cluster. So we don't need to handle this at the code in client level if the webhdfs request is passing through Apache Knox.

performance differences between your approach vs Hadoop's RemoteFS and WebHDFS

Say a remote Spark cluster needs to read a file of size 2 GB and the Spark Cluster spawns 16 connections in parallel to do the same. So in turn 16 separate webhdfs calls are made to remote hdfs. However, though each call tries to read the data from different starting point, for each of them the end byte is the end of file. So first connection creates input stream corresponding to 0th byte till end of file, second from 128MB till end of file, the 3rd from 256 MB till and of file and so on. As a result of that the amount of data prepared in the server side for sending as response, the data transferred over the wire, and the data being read by the client side can potentially be much more than the original file size (in this example of 2 GB worth of original file it can potentially be close to 17 GB). This number would increase further more with more number of connections. For larger file size the extent of increase would be further higher too.

In the approach used in this PR, for the above example, the total volume of data read and transferred over the wire will be always limited to 2 GB and some extra KBs (for record boundary resolution). This number will increase to a very less extent (still in KBs range) for more number of connections. And this increment will not depend on file size. So if a big volume of file (in GBs) has to be read with high number of connections in parallel the amount of data being processed at server side, transferred over the wire, and read at client side would be always limited to original file size and some extra KBs (for record boundary resolution).

Configuration
Some configuration parameters are specific to remote servers that should be specified by server not on connector level (some at server level may override connector level), i.e.
Server level:
gateway path (assuming one Knox gateway per server)
user name and password
authentication method (think Kerberos etc)
Connector level:
certificate validation options (maybe overridden by server level props)
trustStore path
webhdfs protocol version (maybe overridden by server level props)
buffer sizes and file chunk sizes retry intervals etc

You are right. However, I would put the 2 levels as Server Level and File Level. Some parameters won't change from file to file - they are specific to a remote hdfs server and therefore Server level parameters. Where as value of some parameters can be different from file to file. These are File level parameters. The Server Level parameters are - Gateway Path, User Name/Pasword, Webhdfs protocol version, Certificate Validation option (and other parameters associated with that). Where as File Level parameters are buffer sizes, file chunks sizes etc which can be different from File to File.
I don't see need for any property at connector level (the parameters which which would be same across different remote hdfs servers accessed by the connector). All properties here are related to either the nature of implementation of the remote HDFS server or the type of file being accessed. Let me know if I'm missing out any aspect here.

Usability
Given that users need to know about the remote Hadoop server configuration (security, gateway path, etc) for WebHDFS access would it be nicer if ...
users could separately configure server specific properties in a config file or registry object
and then in Spark jobs only use :/ without having to provide additional properties

That's a good idea. We can have a set of default values for these parameters based on typical practice/convention. However, those default values can be overwritten if specified by user.

Security
what authentication methods are supported besides basic auth (i.e. OAuth, Kerberos, ...)

Right now this PR supports basic Auth at the Knox gateway level. Other authentication mechanisms supported by Apache Knox (SAML, OAuth, CAS, OpenId) are not supported yet.
Apache Knox complements support for Kerberized hadoop cluster (for nodes to communicate and authenticate among themselves). That would be taken care of by Apache Knox transparently through relevant configuration.

should the connector manage auth tokens, token renewal, etc

No. It is internally handled by Apache Knox.

I don't think the connector should create a truststore, either skip certificate validation or take a user provided truststore path (btw, the current code fails to create a truststore on Mac OS X)

On a second thought I'm with you

Debugging
the code should have logging at INFO, DEGUG, ERROR levels using the Spark logging mechanisms (targeting the Spark log files)

Agreed

Testing
The outstanding unit tests should verify that the connector works with a ...
standard Hadoop cluster (unsecured)

This PR focuses only on secured Hadoop cluster. Unsecured hadoop cluster can be accessed using existing webhdfs client library available from hadoop. So we don't need this.

Hadoop clusters secured by Apache Knox

Yes

Hadoop clusters secured by other mechanisms like Kerberos

We need not as it is more a feature of Apache Knox.

@ckadner
Copy link
Member

ckadner commented Apr 10, 2017

@sourav-mazumder -- do you have any updates on the progress of this PR?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants