Skip to content

Commit

Permalink
Introduce the ConfigEntry to gluten config
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf committed Jan 6, 2025
1 parent 55ab10d commit 37bb23f
Show file tree
Hide file tree
Showing 17 changed files with 719 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
import org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer
Expand Down Expand Up @@ -76,9 +76,11 @@ case class CachedColumnarBatch(
* -> Convert DefaultCachedBatch to InternalRow using vanilla Spark serializer
*/
// format: on
class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHelper with Logging {
class ColumnarCachedBatchSerializer extends CachedBatchSerializer with Logging {
private lazy val rowBasedCachedBatchSerializer = new DefaultCachedBatchSerializer

private def glutenConf: GlutenConfig = GlutenConfig.get

private def toStructType(schema: Seq[Attribute]): StructType = {
StructType(schema.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
}
Expand Down Expand Up @@ -108,14 +110,14 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe
// `convertColumnarBatchToCachedBatch`, but the inside ColumnarBatch is not arrow-based.
// See: `InMemoryRelation.apply()`.
// So we should disallow columnar input if using vanilla Spark columnar scan.
val noVanillaSparkColumnarScan = conf.getConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED) ||
!conf.getConf(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED)
conf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(
val noVanillaSparkColumnarScan = glutenConf.getConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED) ||
!glutenConf.getConf(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED)
glutenConf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(
schema) && noVanillaSparkColumnarScan
}

override def supportsColumnarOutput(schema: StructType): Boolean = {
conf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(schema)
glutenConf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(schema)
}

override def convertInternalRowToCachedBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.gluten.test;

import org.apache.gluten.config.GlutenConfig;
import org.apache.gluten.config.GlutenConfig$;

import com.codahale.metrics.MetricRegistry;
import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -71,7 +72,7 @@ public Object ask(Object message) throws Exception {
private static SparkConf newSparkConf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS(), "0");
conf.set(GlutenConfig$.MODULE$.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), "0");
return conf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@ object ColumnarTableCacheBenchmark extends SqlBasedBenchmark {

private def doBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = {
val benchmark = new Benchmark(name, cardinality, output = output)
val flag = if (spark.sessionState.conf.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED)) {
"enable"
} else {
"disable"
}
val flag =
if (
spark.sessionState.conf
.getConfString(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key)
.toBoolean
) {
"enable"
} else {
"disable"
}
benchmark.addCase(s"$flag columnar table cache", 3)(_ => f)
benchmark.run()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ abstract class HashAggregateExecBaseTransformer(
BackendsApiManager.getMetricsApiInstance.genHashAggregateTransformerMetrics(sparkContext)

protected def isCapableForStreamingAggregation: Boolean = {
if (!conf.getConf(GlutenConfig.COLUMNAR_PREFER_STREAMING_AGGREGATE)) {
if (!glutenConf.getConf(GlutenConfig.COLUMNAR_PREFER_STREAMING_AGGREGATE)) {
return false
}
if (groupingExpressions.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
override def nodeName: String = s"WholeStageCodegenTransformer ($transformStageId)"

override def verboseStringWithOperatorId(): String = {
val nativePlan = if (conf.getConf(GlutenConfig.INJECT_NATIVE_PLAN_STRING_TO_EXPLAIN)) {
val nativePlan = if (glutenConf.getConf(GlutenConfig.INJECT_NATIVE_PLAN_STRING_TO_EXPLAIN)) {
s"Native Plan:\n${nativePlanString()}"
} else {
""
Expand Down Expand Up @@ -315,7 +315,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f

def doWholeStageTransform(): WholeStageTransformContext = {
val context = generateWholeStageTransformContext()
if (conf.getConf(GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT)) {
if (glutenConf.getConf(GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT)) {
wholeStageTransformerContext = Some(context)
}
context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ case class WriteFilesExecTransformer(
}

private def getFinalChildOutput: Seq[Attribute] = {
val metadataExclusionList = conf
val metadataExclusionList = glutenConf
.getConf(GlutenConfig.NATIVE_WRITE_FILES_COLUMN_METADATA_EXCLUSION_LIST)
.split(",")
.map(_.trim)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan}
*/
case class RemoveNativeWriteFilesSortAndProject() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.getConf(GlutenConfig.REMOVE_NATIVE_WRITE_FILES_SORT_AND_PROJECT)) {
if (!GlutenConfig.get.getConf(GlutenConfig.REMOVE_NATIVE_WRITE_FILES_SORT_AND_PROJECT)) {
return plan
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class SoftAffinityWithRDDInfoSuite extends QueryTest with SharedSparkSession wit
.set(GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED, "true")
.set(GlutenConfig.GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM, "2")
.set(GlutenConfig.GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS, "2")
.set(GlutenConfig.SOFT_AFFINITY_LOG_LEVEL, "INFO")
.set(GlutenConfig.SOFT_AFFINITY_LOG_LEVEL.key, "INFO")

test("Soft Affinity Scheduler with duplicate reading detection") {
if (SparkShimLoader.getSparkShims.supportDuplicateReadingTracking) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
try {
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
def testWithFallbackSettings(scanFallback: Boolean, aggFallback: Boolean): Unit = {
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED, scanFallback)
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED, aggFallback)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, scanFallback.toString)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key, aggFallback.toString)
val df = session.sql("SELECT max(id) FROM a")
val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
try {
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
def testWithFallbackSettings(scanFallback: Boolean, aggFallback: Boolean): Unit = {
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED, scanFallback)
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED, aggFallback)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, scanFallback.toString)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key, aggFallback.toString)
val df = session.sql("SELECT max(id) FROM a")
val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
try {
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
def testWithFallbackSettings(scanFallback: Boolean, aggFallback: Boolean): Unit = {
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED, scanFallback)
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED, aggFallback)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, scanFallback.toString)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key, aggFallback.toString)
val df = session.sql("SELECT max(id) FROM a")
val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
try {
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
def testWithFallbackSettings(scanFallback: Boolean, aggFallback: Boolean): Unit = {
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED, scanFallback)
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED, aggFallback)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, scanFallback.toString)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key, aggFallback.toString)
val df = session.sql("SELECT max(id) FROM a")
val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
session,
Expand Down
Loading

0 comments on commit 37bb23f

Please sign in to comment.