From de9146f4b3c7cea1a593fb9c1c640868327d39b6 Mon Sep 17 00:00:00 2001 From: tobe Date: Fri, 23 Jul 2021 11:41:49 +0800 Subject: [PATCH] feat: support iceberg for openmldb batch (#146) * Add iceberg dependencies and set iceberg catalog from config * Fix end2end window test case * Reset the default value of openmldb batch config * Add new APIs to import table in offline storage --- java/openmldb-batch/pom.xml | 6 ++ .../openmldb/batch/OpenmldbBatchConfig.scala | 6 ++ .../openmldb/batch/api/OpenmldbSession.scala | 65 ++++++++++++++++++- .../openmldb/batch/end2end/TestWindow.scala | 7 +- 4 files changed, 77 insertions(+), 7 deletions(-) diff --git a/java/openmldb-batch/pom.xml b/java/openmldb-batch/pom.xml index 5128c95e782..b20a52bf3df 100644 --- a/java/openmldb-batch/pom.xml +++ b/java/openmldb-batch/pom.xml @@ -332,6 +332,12 @@ 2.7.4 + + + org.apache.iceberg + iceberg-spark3-runtime + 0.11.1 + diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala index 1c83e215610..3fa652f51bf 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala @@ -123,6 +123,12 @@ class OpenmldbBatchConfig extends Serializable { //@ConfigOption(name="openmldb.hybridse.jsdk.path", doc="The path of HybridSE jsdk core file path") var hybridseJsdkLibraryPath = "" + @ConfigOption(name="openmldb.hadoop.warehouse.path", doc="The path of Hadoop warehouse") + var hadoopWarehousePath = "" + + @ConfigOption(name="openmldb.iceberg.catalog.name", doc="The name of Iceberg catalog") + val icebergCatalogName = "iceberg_catalog" + } diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala index 84881765337..069dd443512 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala @@ -18,6 +18,11 @@ package com._4paradigm.openmldb.batch.api import com._4paradigm.openmldb.batch.{OpenmldbBatchConfig, SparkPlanner} import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.iceberg.PartitionSpec +import org.apache.iceberg.catalog.TableIdentifier +import org.apache.iceberg.hadoop.HadoopCatalog +import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -52,7 +57,7 @@ class OpenmldbSession { this() this.sparkSession = sparkSession this.config = OpenmldbBatchConfig.fromSparkSession(sparkSession) - this.sparkSession.conf.set("spark.sql.session.timeZone", config.timeZone) + this.setDefaultSparkConfig() } /** @@ -74,18 +79,32 @@ class OpenmldbSession { val builder = SparkSession.builder() // TODO: Need to set for official Spark 2.3.0 jars - logger.debug("Set spark.hadoop.yarn.timeline-service.enabled as false") - builder.config("spark.hadoop.yarn.timeline-service.enabled", value = false) + //logger.debug("Set spark.hadoop.yarn.timeline-service.enabled as false") + //builder.config("spark.hadoop.yarn.timeline-service.enabled", value = false) + builder.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") this.sparkSession = builder.appName("App") .master(sparkMaster) .getOrCreate() + + this.setDefaultSparkConfig() } this.sparkSession } } + def setDefaultSparkConfig(): Unit = { + val sparkConf = this.sparkSession.conf + // Set timezone + sparkConf.set("spark.sql.session.timeZone", config.timeZone) + + // Set Iceberg catalog + sparkConf.set("spark.sql.catalog.%s".format(config.icebergCatalogName), "org.apache.iceberg.spark.SparkCatalog") + sparkConf.set("spark.sql.catalog.%s.type".format(config.icebergCatalogName), "hadoop") + sparkConf.set("spark.sql.catalog.%s.warehouse".format(config.icebergCatalogName), this.config.hadoopWarehousePath) + } + /** * Read the file with get dataframe with Spark API. * @@ -208,4 +227,44 @@ class OpenmldbSession { sparkSession.stop() } + /** + * Create table and import data from DataFrame to offline storage. + */ + def importToOfflineStorage(databaseName: String, tableName: String, df: DataFrame): Unit = { + createOfflineTable(databaseName, tableName, df) + appendOfflineTable(databaseName, tableName, df) + } + + /** + * Create table in offline storage. + */ + def createOfflineTable(databaseName: String, tableName: String, df: DataFrame): Unit = { + // TODO: Check if table exists + + logger.info("Register the table %s to create table in offline storage".format(tableName)) + df.createOrReplaceTempView(tableName) + + val hadoopConfiguration = new Configuration() + val hadoopCatalog = new HadoopCatalog(hadoopConfiguration, config.hadoopWarehousePath) + val icebergSchema = SparkSchemaUtil.schemaForTable(this.getSparkSession, tableName) + val partitionSpec = PartitionSpec.builderFor(icebergSchema).build() + val tableIdentifier = TableIdentifier.of(databaseName, tableName) + + // Create Iceberg table + hadoopCatalog.createTable(tableIdentifier, icebergSchema, partitionSpec) + } + + /** + * Append data from DataFrame to offline storage. + */ + def appendOfflineTable(databaseName: String, tableName: String, df: DataFrame): Unit = { + logger.info("Register the table %s to append data in offline storage".format(tableName)) + df.createOrReplaceTempView(tableName) + + val icebergTableName = "%s.%s.%s".format(config.icebergCatalogName, databaseName, tableName) + + // Insert parquet to Iceberg table + this.getSparkSession.table(tableName).writeTo(icebergTableName).append() + } + } diff --git a/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/end2end/TestWindow.scala b/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/end2end/TestWindow.scala index e2bc5eac7a1..21a18f3dfc3 100644 --- a/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/end2end/TestWindow.scala +++ b/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/end2end/TestWindow.scala @@ -17,6 +17,7 @@ package com._4paradigm.openmldb.batch.end2end import com._4paradigm.openmldb.batch.api.OpenmldbSession +import com._4paradigm.openmldb.batch.utils.SparkUtil import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SparkSession} import org.scalatest.FunSuite @@ -54,11 +55,9 @@ class TestWindow extends FunSuite { val sqlText = "SELECT sum(trans_amount) OVER w AS w_sum_amount FROM t1 WINDOW w AS (PARTITION BY user ORDER BY trans_time ROWS BETWEEN 10 PRECEDING AND CURRENT ROW)" val outputDf = sess.sql(sqlText) - sess.version() - - //val sparksqlOutputDf = sess.sparksql(sqlText) + val sparksqlOutputDf = sess.sparksql(sqlText) // Notice that the sum column type is different for SparkSQL and SparkFE - //assert(SparkUtil.approximateDfEqual(outputDf.getSparkDf(), sparksqlOutputDf, false)) + assert(SparkUtil.approximateDfEqual(outputDf.getSparkDf(), sparksqlOutputDf, false)) } }