Skip to content

This component acts as a bridge between Spark and Vertica, allowing the user to either retrieve data from Vertica for processing in Spark, or store processed data from Spark into Vertica.

License

Notifications You must be signed in to change notification settings

ches/spark-connector

 
 

Repository files navigation

Vertica Spark Connector

This component acts as a bridge between Spark and Vertica, allowing the user to either retrieve data from Vertica for processing in Spark, or store processed data from Spark into Vertica.

Why is this connector desired instead of using a more generic JDBC connector? A few reasons:

  • Vertica-specific syntax and features. This connector can support things like Vertica projections
  • Authentication to the Vertica server
  • Segmentation. We can use the way that Vertica segments data to inform our operations.
  • Ability to use other technology as an intermediary for data transfer. This is necessary for maximizing performance of parallel loads.

This connector is built as a JAR file to be sourced in your spark application. This is accessible through maven central or you can build the JAR yourself with sbt assembly.

The connector relies on a distributed filesystem, such as HDFS, to act as a bridge between Spark and Vertica. This is done to allow for maximum performance at high scale.

Overview

Getting Started

To get started with using the connector, we'll need to make sure all the prerequisites are in place. These are:

  • A Vertica installation
  • An HDFS cluster or S3 bucket, for use as an intermediary between Spark and Vertica
  • A spark application, either running locally for quick testing, or running on a spark cluster. If using S3, Spark must be using hadoop 3.3

For an easier quick test of the connector using a dockerized environment, see this guide for running our examples.

Vertica

Follow the Vertica Documenation for steps on installing Vertica.

Spark

There are several examples of spark programs that use this connector in the examples directory.

The methods for creating a spark cluster are documented here.

Once you have a spark cluster, you can run such an application with spark-submit, including the connector jar.

spark-submit --master spark://cluster-url.com:7077 --deploy-mode cluster sparkconnectorprototype-assembly-0.1.jar

HDFS Cluster

An HDFS setup can have various configurations, for details of what might best fit your infrastructure the HDFS Architecture Guide is recommended.

For a quick start, you can either check out our Guide on setting up a single-node HDFS or if you are just wanting a quick test run, you can use an HDFS container in Docker as documented in our contributing guide.

However you set up HDFS, Vertica needs to have a copy of the hadoop configuration (location of hdfs-site.xml and core-site.xml). Each Vertica node must have access. Options for this are to either copy said configuration to /etc/hadoop/conf on those machines, or to tell Vertica where to find the configuration like so:

ALTER DATABASE <database name> SET HadoopConfDir = '/hadoop/conf/location/';

Java

Spark Connector requires Java 8 (8u92 or later) or Java 11.

Scala

For the Spark connector, Spark 3.0.0 and above use Scala 2.12. You will need to use a compatible Scala version (2.12.x).

AWS S3

Follow our S3 user manual to learn how to use the connector with S3.

Connector Usage

Using the connector in Spark is straightforward. It requires the data source name, an options map, and if writing to Vertica, a Spark Save Mode.

val opts = Map(
    "host" -> "vertica_hostname",
    "user" -> "vertica_user",
    "db" -> "db_name",
    "password" -> "db_password",
    "staging_fs_url" -> "hdfs://hdfs-url:7077/data",
    "logging_level" -> "ERROR",
    "table" -> "tablename"
  )

// Create data and put it in a dataframe
val schema = new StructType(Array(StructField("col1", IntegerType)))
val data = Seq(Row(77))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).coalesce(1)

val mode = SaveMode.ErrorIfExists

// Write the dataframe to Vertica
df.write.format("com.vertica.spark.datasource.VerticaSource").options(opts ).mode(mode).save()

// Read it back from Vertica
val dfRead: DataFrame = spark.read.format("com.vertica.spark.datasource.VerticaSource").options(opts).load()

Note on permissions

The connector, using an intermediary file location between Spark and Vertica, may require both sides to have write access to the intermediary location used.

The default permission (configurable via the 'file_permissions' option) is '700', or access only for the given user. This is the default for security reasons, but will cause issues if your user running spark and database user are different.

There are a few ways to solve this:

  • Change the file_permissions option to a suitable value. Warning: setting this to '777' means any user with filesystem access could read the data being transfered. Setting this to '770' could work if users on each side of the operation share a group.
  • Change the operation to use the same user for spark and the database
  • Use export HADOOP_USER_NAME= to have spark's hadoop communication use the same user as the DB

Connector Options

Below is a detailed list of connector options that are used in the options map:

Configuration Option Possible Values Description Required Default Value
table String The name of the target Vertica table to save your Spark DataFrame. New: Should parse schema from tablename if specified with schema.table No (Either or with Query)
query String Alternative to specifying the table name. This is any SQL query that will produce results. The user can use this to specify a join between two tables, or an aggregation, for example. This option is typically used for a read operation. No (Either or with Table)
db String The name of the Vertica Database Yes
user String The name of the Vertica user. This user must have CREATE and INSERT privileges in the Vertica schema. The schema defaults to “public”, but may be changed using the “dbschema” optional tuning parameter. Yes
password String The password for the Vertica user. No if SSO
host String The hostname of a Vertica node. This value is used to make the initial connection to Vertica and look up all the other Vertica node IP addresses. You can provide a specific IP address or a resolvable name such as myhostname.com. Yes
dbschema Public (+ some number of currently unknown values) The schema space for the Vertica table. No public
port Int The Vertica Port. No 5433
failed_rows_percent_tolerance Number The tolerance level for failed rows, as a percentage. For example, to specify that the job fails if greater than 10% of the rows are rejected, specify this value as 0.10 for 10% tolerance. No 0.00
strlen Int The string length. Use this option to increase (or decrease) the default length when saving Spark StringType to Vertica VARCHAR type. Used to determine whether to use VARCHAR or LONG VARCHAR No 1024
target_table_sql (previously target_table_ddl) String A SQL statement to run in Vertica before copying data. You usually use this option to create the target table to receive the data from Spark. The table this statement creates should match the table name you specify in the table option. See Creating the Destination Table From Spark for more information. This connector option is different from 'query', as this option is ran before a write operation, typically with a custom statement to create a table. No
copy_column_list String A custom column list to supply to the Vertica COPY statement that loads the Spark data into Vertica ("foo, bar, baz" or "foo,bar,baz"). This option lets you adjust the data so that the Vertica table and the Spark data frame do not have to have the same number of columns. See Copying Data to a Table With a Different Schema for more information. No
num_partitions (previously numpartitions) Int The number of Spark partitions used when reading from Vertica. Each of these will correspond to a task with its own JDBC connection. Performance testing has indicated that the ideal number of partitions is 4*N where N is the number nodes in the Vertica cluster for the direct read method. Most efficient partition count for intermediate method may vary, will require testing and may be a tradeoff of memory vs time. No 1 per exported parquet file
staging_fs_url (previously hdfs_url) URL String The fully-qualified path to a directory in HDFS or S3 that will be used as a data staging area. For example, hdfs://myhost:8020/data/test. For S3, the URL should begin with s3a://. The connector first saves the DataFrame in its entirety to this location before loading into Vertica. The data is saved in Parquet format, and the files are saved in a directory specific to each job. This directory is then is deleted when the job completes. Note that you need additional configuration changes for to use HDFS. You must first configure all nodes to use HDFS. See Configuring the hdfs Scheme. Yes
merge_key String In order to execute a merge statement in our Spark Connector, the user needs to pass in a merge key, which will be a list of comma-separated column attributes to join the existing table and temporary table on ("foo, bar, baz" or "foo,bar,baz"). Note: the merge key must be unique in the dataframe for the merge to work. Additionally, the number of columns in the dataframe must be less than or equal to the number of columns in the target table. If the number of columns in the dataframe are less than the target table and the names of the columns differ, use copy_column_list to match the dataframe columns against the target table columns. Last note: while you may use copy_column_list to get around differing column names between the dataframe and existing table, the column names of the merge_key must match in both schemas. No
kerberos_service_name String The Kerberos service name, as specified when creating the service principal (outlined here) No (Yes if using Kerberos)
kerberos_host_name String The Kerberos host name, as specified when creating the service principal (outlined here) No (Yes if using Kerberos)
jaas_config_name String The name of the JAAS configuration used for Kerberos authentication No verticajdbc
tls_mode "disable" / "require" / "verify-ca" / "verify-full" When not set to "disable", connections with Vertica are encrypted. See Vertica's documentation for more details on what each mode does. No "disable"
key_store_path String The local path to a .JKS file containing your private keys and their corresponding certificate chains. No
key_store_password String The password protecting the keystore file. If individual keys are also encrypted, the keystore file password must match the password for a key within the keystore. No
trust_store_path String The local path to a .JKS truststore file containing certificates from authorities you trust. No
trust_store_password String The password protecting the truststore file No
file_permissions String Unix file permissions used for the intermediary filestore, can be in octal format (ie 750) or user-group-other (ie -rwxr--r--) No 770
max_file_size_export_mb Int Vertica maximum export file size in MB. No 4096
max_row_group_size_export_mb Int Vertica maximum row group file size in MB. No 16
aws_access_key_id String The access key ID for S3. Specifying this option sets the access key ID at the session level. Alternatively, you can set the spark.hadoop.fs.s3a.access.key option in the Spark configuration or the environment variable AWS_ACCESS_KEY_ID. No (Yes if aws_secret_access_key is specified)
aws_secret_access_key String The secret access key for S3. Specifying this option sets the secret access key at the session level. Alternatively, you can set the spark.hadoop.fs.s3a.access.key option in the Spark configuration or the environment variable AWS_SECRET_ACCESS_KEY. No (Yes if aws_access_key_id is specified)
aws_region String The AWS region for S3. Specifying this option sets the secret access key at the session level. Alternatively, you can set the environment variable AWS_DEFAULT_REGION. No
aws_session_token String An AWS session token. Specifying this option sets the session token at the session level. Alternatively, you can set the spark.hadoop.fs.s3a.session.token option in the Spark configuration or the environment variable AWS_SESSION_TOKEN. No
aws_credentials_provider String The AWS credentials provider. For example, if you want to use the IAMInstanceCredentialsProvider, you would specify this option as "org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider". Specifying this option sets the AWS credentials provider option in the Hadoop configuration. Alternatively, you can set the spark.hadoop.fs.s3a.aws.credentials.provider option in the Spark configuration. If left blank, Hadoop will default to trying SimpleAWSCredentialsProvider, EnvironmentVariableCredentialsProvider, and InstanceProfileCredentialsProvider in order. Please note that we have only tested the connector with SimpleAWSCredentialsProvider and IAMInstanceCredentialsProvider, so other credential providers will have limited support. No
aws_endpoint String The AWS endpoint to use, if not the default amazon AWS instance. Can be used if interacting with a different data store that implements the S3 API. No
aws_enable_ssl "true" / "false" Whether SSL should be used. Defaults to always using SSL, only change this if using an endpoint that does not use https. This value cannot be set on the session level in Vertica, so must be set seperately. Use ALTER DATABASE SET AWSEnableHttps=0 for this. No true

Note: If you are using the S3 properties, the connector options has priority over the Spark configuration, which has priority over the environment variables.

Examples

If you would like to try out the connector, we have several example applications you can run in the examples folder.

Configuring Logging

The connector uses Spark's log4j configuration for logging. Logging is configured by updating the $SPARK_HOME/log4j.properties file on the worker nodes. The log4j.properties file needs to be in the classpath as well.

Here is an example spark-submit command showing how to add Spark's default configuration folder to the classpath: spark-submit --master spark://spark:7077 --conf "spark.driver.extraClassPath={$SPARK_HOME}/conf/" --deploy-mode cluster app-assembly.jar --class Main where {$SPARK_HOME} is the $SPARK_HOME directory on the worker node.

Additional guides

For information on how to configure Kerberos and TLS with the connector, see the KerberosUserManual.md and TLSConfiguration.md files in the root of this repository.

For information on tuning performance, see here in our performance-tests section.

Limitations

The connector supports versions of Spark between 3.0 and 3.1.1.

The connector supports basic Spark types. Complex types are not currently supported (arrays, maps, structs).

If using S3 rather than HDFS, the spark cluster must be running with hadoop 3.3. Our S3 user manual goes over how to configure this.

You may see an error such as: 21/05/31 17:35:21 ERROR DFSClient: Failed to close inode 16447

This is simply an issue with hadoop closing file handles on files that have been cleaned up by the connector. It does not cause any functional issues.

About

This component acts as a bridge between Spark and Vertica, allowing the user to either retrieve data from Vertica for processing in Spark, or store processed data from Spark into Vertica.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Scala 99.6%
  • Other 0.4%