Skip to content

Commit

Permalink
[SPARK-36246][CORE][TEST] GHA WorkerDecommissionExtended flake
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

GHA probably doesn't have the same resources as jenkins so move down from 5 to 3 execs and give a bit more time for them to come up.

### Why are the changes needed?

Test is timing out in GHA

### Does this PR introduce _any_ user-facing change?
No, test only change.

### How was this patch tested?

Run through GHA verify no OOM during WorkerDecommissionExtended

Closes apache#33467 from holdenk/SPARK-36246-WorkerDecommissionExtendedSuite-flakes-in-GHA.

Lead-authored-by: Holden Karau <[email protected]>
Co-authored-by: Holden Karau <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
2 people authored and HyukjinKwon committed Jul 22, 2021
1 parent dcc0aaa commit 89a8319
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkContext {
private val conf = new org.apache.spark.SparkConf()
.setAppName(getClass.getName)
.set(SPARK_MASTER, "local-cluster[5,1,512]")
.set(EXECUTOR_MEMORY, "512m")
.set(SPARK_MASTER, "local-cluster[3,1,384]")
.set(EXECUTOR_MEMORY, "384m")
.set(DYN_ALLOCATION_ENABLED, true)
.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 5)
.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 3)
.set(DECOMMISSION_ENABLED, true)

test("Worker decommission and executor idle timeout") {
sc = new SparkContext(conf.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s"))
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 5, 60000)
TestUtils.waitUntilExecutorsUp(sc, 3, 80000)
val rdd1 = sc.parallelize(1 to 10, 2)
val rdd2 = rdd1.map(x => (1, x))
val rdd3 = rdd2.reduceByKey(_ + _)
Expand All @@ -53,10 +53,10 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte
}
}

test("Decommission 4 executors from 5 executors in total") {
test("Decommission 2 executors from 3 executors in total") {
sc = new SparkContext(conf)
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 5, 60000)
TestUtils.waitUntilExecutorsUp(sc, 3, 80000)
val rdd1 = sc.parallelize(1 to 100000, 200)
val rdd2 = rdd1.map(x => (x % 100, x))
val rdd3 = rdd2.reduceByKey(_ + _)
Expand Down

0 comments on commit 89a8319

Please sign in to comment.