Skip to content

Commit

Permalink
feat: support iceberg for openmldb batch (4paradigm#146)
Browse files Browse the repository at this point in the history
* Add iceberg dependencies and set iceberg catalog from config

* Fix end2end window test case

* Reset the default value of openmldb batch config

* Add new APIs to import table in offline storage
  • Loading branch information
tobegit3hub authored Jul 23, 2021
1 parent 51585d1 commit de9146f
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 7 deletions.
6 changes: 6 additions & 0 deletions java/openmldb-batch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@
<version>2.7.4</version>
</dependency>

<!-- Iceberg -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark3-runtime</artifactId>
<version>0.11.1</version>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ class OpenmldbBatchConfig extends Serializable {
//@ConfigOption(name="openmldb.hybridse.jsdk.path", doc="The path of HybridSE jsdk core file path")
var hybridseJsdkLibraryPath = ""

@ConfigOption(name="openmldb.hadoop.warehouse.path", doc="The path of Hadoop warehouse")
var hadoopWarehousePath = ""

@ConfigOption(name="openmldb.iceberg.catalog.name", doc="The name of Iceberg catalog")
val icebergCatalogName = "iceberg_catalog"

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ package com._4paradigm.openmldb.batch.api

import com._4paradigm.openmldb.batch.{OpenmldbBatchConfig, SparkPlanner}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -52,7 +57,7 @@ class OpenmldbSession {
this()
this.sparkSession = sparkSession
this.config = OpenmldbBatchConfig.fromSparkSession(sparkSession)
this.sparkSession.conf.set("spark.sql.session.timeZone", config.timeZone)
this.setDefaultSparkConfig()
}

/**
Expand All @@ -74,18 +79,32 @@ class OpenmldbSession {
val builder = SparkSession.builder()

// TODO: Need to set for official Spark 2.3.0 jars
logger.debug("Set spark.hadoop.yarn.timeline-service.enabled as false")
builder.config("spark.hadoop.yarn.timeline-service.enabled", value = false)
//logger.debug("Set spark.hadoop.yarn.timeline-service.enabled as false")
//builder.config("spark.hadoop.yarn.timeline-service.enabled", value = false)
builder.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")

this.sparkSession = builder.appName("App")
.master(sparkMaster)
.getOrCreate()

this.setDefaultSparkConfig()
}

this.sparkSession
}
}

def setDefaultSparkConfig(): Unit = {
val sparkConf = this.sparkSession.conf
// Set timezone
sparkConf.set("spark.sql.session.timeZone", config.timeZone)

// Set Iceberg catalog
sparkConf.set("spark.sql.catalog.%s".format(config.icebergCatalogName), "org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.%s.type".format(config.icebergCatalogName), "hadoop")
sparkConf.set("spark.sql.catalog.%s.warehouse".format(config.icebergCatalogName), this.config.hadoopWarehousePath)
}

/**
* Read the file with get dataframe with Spark API.
*
Expand Down Expand Up @@ -208,4 +227,44 @@ class OpenmldbSession {
sparkSession.stop()
}

/**
* Create table and import data from DataFrame to offline storage.
*/
def importToOfflineStorage(databaseName: String, tableName: String, df: DataFrame): Unit = {
createOfflineTable(databaseName, tableName, df)
appendOfflineTable(databaseName, tableName, df)
}

/**
* Create table in offline storage.
*/
def createOfflineTable(databaseName: String, tableName: String, df: DataFrame): Unit = {
// TODO: Check if table exists

logger.info("Register the table %s to create table in offline storage".format(tableName))
df.createOrReplaceTempView(tableName)

val hadoopConfiguration = new Configuration()
val hadoopCatalog = new HadoopCatalog(hadoopConfiguration, config.hadoopWarehousePath)
val icebergSchema = SparkSchemaUtil.schemaForTable(this.getSparkSession, tableName)
val partitionSpec = PartitionSpec.builderFor(icebergSchema).build()
val tableIdentifier = TableIdentifier.of(databaseName, tableName)

// Create Iceberg table
hadoopCatalog.createTable(tableIdentifier, icebergSchema, partitionSpec)
}

/**
* Append data from DataFrame to offline storage.
*/
def appendOfflineTable(databaseName: String, tableName: String, df: DataFrame): Unit = {
logger.info("Register the table %s to append data in offline storage".format(tableName))
df.createOrReplaceTempView(tableName)

val icebergTableName = "%s.%s.%s".format(config.icebergCatalogName, databaseName, tableName)

// Insert parquet to Iceberg table
this.getSparkSession.table(tableName).writeTo(icebergTableName).append()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com._4paradigm.openmldb.batch.end2end

import com._4paradigm.openmldb.batch.api.OpenmldbSession
import com._4paradigm.openmldb.batch.utils.SparkUtil
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
import org.scalatest.FunSuite
Expand Down Expand Up @@ -54,11 +55,9 @@ class TestWindow extends FunSuite {
val sqlText = "SELECT sum(trans_amount) OVER w AS w_sum_amount FROM t1 WINDOW w AS (PARTITION BY user ORDER BY trans_time ROWS BETWEEN 10 PRECEDING AND CURRENT ROW)"
val outputDf = sess.sql(sqlText)

sess.version()

//val sparksqlOutputDf = sess.sparksql(sqlText)
val sparksqlOutputDf = sess.sparksql(sqlText)
// Notice that the sum column type is different for SparkSQL and SparkFE
//assert(SparkUtil.approximateDfEqual(outputDf.getSparkDf(), sparksqlOutputDf, false))
assert(SparkUtil.approximateDfEqual(outputDf.getSparkDf(), sparksqlOutputDf, false))
}

}

0 comments on commit de9146f

Please sign in to comment.