Skip to content

Commit

Permalink
[SPARK-33049][CORE] Decommission shuffle block test is flaky
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Increase the listener bus event length, syncrhonize the addition of blocks modified to the array list.

### Why are the changes needed?

This test appears flaky in Jenkins (can not repro locally). Given that the index file made it through and the index file is only transferred after the data file, the only two reasons I could come up with an interminentent failure here are with the listenerbus dropping a message or the two block change messages being received at the same time.

### Does this PR introduce _any_ user-facing change?

No (test only).

### How was this patch tested?

The tests still pass on my machine but they did before. We'll need to run it through jenkins a few times first.

Closes apache#29929 from holdenk/fix-.BlockManagerDecommissionIntegrationSuite.

Authored-by: Holden Karau <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
holdenk authored and dongjoon-hyun committed Oct 3, 2020
1 parent 37c806a commit db420f7
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist)
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
// Since we use the bus for testing we don't want to drop any messages
.set(config.LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1000000)
// Just replicate blocks quickly during testing, there isn't another
// workload we need to worry about.
.set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
Expand Down Expand Up @@ -137,7 +139,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
taskEndEvents.add(taskEnd)
}

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = synchronized {
blocksUpdated.append(blockUpdated)
}

Expand Down

0 comments on commit db420f7

Please sign in to comment.