Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8327][CORE][Part-3] Introduce the ConfigEntry to gluten config #8431

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
import org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer
Expand Down Expand Up @@ -76,9 +76,11 @@ case class CachedColumnarBatch(
* -> Convert DefaultCachedBatch to InternalRow using vanilla Spark serializer
*/
// format: on
class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHelper with Logging {
class ColumnarCachedBatchSerializer extends CachedBatchSerializer with Logging {
private lazy val rowBasedCachedBatchSerializer = new DefaultCachedBatchSerializer

private def glutenConf: GlutenConfig = GlutenConfig.get

private def toStructType(schema: Seq[Attribute]): StructType = {
StructType(schema.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
}
Expand Down Expand Up @@ -108,14 +110,14 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe
// `convertColumnarBatchToCachedBatch`, but the inside ColumnarBatch is not arrow-based.
// See: `InMemoryRelation.apply()`.
// So we should disallow columnar input if using vanilla Spark columnar scan.
val noVanillaSparkColumnarScan = conf.getConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED) ||
!conf.getConf(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED)
conf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(
val noVanillaSparkColumnarScan = glutenConf.getConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED) ||
!glutenConf.getConf(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED)
glutenConf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(
schema) && noVanillaSparkColumnarScan
}

override def supportsColumnarOutput(schema: StructType): Boolean = {
conf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(schema)
glutenConf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(schema)
}

override def convertInternalRowToCachedBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.gluten.test;

import org.apache.gluten.config.GlutenConfig;
import org.apache.gluten.config.GlutenConfig$;

import com.codahale.metrics.MetricRegistry;
import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -71,7 +72,7 @@ public Object ask(Object message) throws Exception {
private static SparkConf newSparkConf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS(), "0");
conf.set(GlutenConfig$.MODULE$.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), "0");
return conf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@ object ColumnarTableCacheBenchmark extends SqlBasedBenchmark {

private def doBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = {
val benchmark = new Benchmark(name, cardinality, output = output)
val flag = if (spark.sessionState.conf.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED)) {
"enable"
} else {
"disable"
}
val flag =
if (
spark.sessionState.conf
.getConfString(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key)
.toBoolean
) {
"enable"
} else {
"disable"
}
benchmark.addCase(s"$flag columnar table cache", 3)(_ => f)
benchmark.run()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ abstract class HashAggregateExecBaseTransformer(
BackendsApiManager.getMetricsApiInstance.genHashAggregateTransformerMetrics(sparkContext)

protected def isCapableForStreamingAggregation: Boolean = {
if (!conf.getConf(GlutenConfig.COLUMNAR_PREFER_STREAMING_AGGREGATE)) {
if (!glutenConf.getConf(GlutenConfig.COLUMNAR_PREFER_STREAMING_AGGREGATE)) {
return false
}
if (groupingExpressions.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
override def nodeName: String = s"WholeStageCodegenTransformer ($transformStageId)"

override def verboseStringWithOperatorId(): String = {
val nativePlan = if (conf.getConf(GlutenConfig.INJECT_NATIVE_PLAN_STRING_TO_EXPLAIN)) {
val nativePlan = if (glutenConf.getConf(GlutenConfig.INJECT_NATIVE_PLAN_STRING_TO_EXPLAIN)) {
s"Native Plan:\n${nativePlanString()}"
} else {
""
Expand Down Expand Up @@ -315,7 +315,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f

def doWholeStageTransform(): WholeStageTransformContext = {
val context = generateWholeStageTransformContext()
if (conf.getConf(GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT)) {
if (glutenConf.getConf(GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT)) {
wholeStageTransformerContext = Some(context)
}
context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ case class WriteFilesExecTransformer(
}

private def getFinalChildOutput: Seq[Attribute] = {
val metadataExclusionList = conf
val metadataExclusionList = glutenConf
.getConf(GlutenConfig.NATIVE_WRITE_FILES_COLUMN_METADATA_EXCLUSION_LIST)
.split(",")
.map(_.trim)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan}
*/
case class RemoveNativeWriteFilesSortAndProject() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.getConf(GlutenConfig.REMOVE_NATIVE_WRITE_FILES_SORT_AND_PROJECT)) {
if (!GlutenConfig.get.getConf(GlutenConfig.REMOVE_NATIVE_WRITE_FILES_SORT_AND_PROJECT)) {
return plan
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class SoftAffinityWithRDDInfoSuite extends QueryTest with SharedSparkSession wit
.set(GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED, "true")
.set(GlutenConfig.GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM, "2")
.set(GlutenConfig.GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS, "2")
.set(GlutenConfig.SOFT_AFFINITY_LOG_LEVEL, "INFO")
.set(GlutenConfig.SOFT_AFFINITY_LOG_LEVEL.key, "INFO")

test("Soft Affinity Scheduler with duplicate reading detection") {
if (SparkShimLoader.getSparkShims.supportDuplicateReadingTracking) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
try {
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
def testWithFallbackSettings(scanFallback: Boolean, aggFallback: Boolean): Unit = {
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED, scanFallback)
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED, aggFallback)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, scanFallback.toString)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key, aggFallback.toString)
val df = session.sql("SELECT max(id) FROM a")
val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
try {
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
def testWithFallbackSettings(scanFallback: Boolean, aggFallback: Boolean): Unit = {
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED, scanFallback)
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED, aggFallback)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, scanFallback.toString)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key, aggFallback.toString)
val df = session.sql("SELECT max(id) FROM a")
val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
try {
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
def testWithFallbackSettings(scanFallback: Boolean, aggFallback: Boolean): Unit = {
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED, scanFallback)
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED, aggFallback)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, scanFallback.toString)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key, aggFallback.toString)
val df = session.sql("SELECT max(id) FROM a")
val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ class GlutenSparkSessionExtensionSuite
try {
session.range(2).write.format("parquet").mode("overwrite").saveAsTable("a")
def testWithFallbackSettings(scanFallback: Boolean, aggFallback: Boolean): Unit = {
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED, scanFallback)
session.sessionState.conf.setConf(GlutenConfig.COLUMNAR_HASHAGG_ENABLED, aggFallback)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, scanFallback.toString)
session.sessionState.conf
.setConfString(GlutenConfig.COLUMNAR_HASHAGG_ENABLED.key, aggFallback.toString)
val df = session.sql("SELECT max(id) FROM a")
val newDf = DummyFilterColmnarHelper.dfWithDummyFilterColumnar(
session,
Expand Down
Loading
Loading