Skip to content

Commit 5db78e6

Browse files
tdasrxin
authored andcommitted
[SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake chooses driver as target
If a block manager (say, A) wants to replicate a block and the node chosen for replication (say, B) is dead, then the attempt to send the block to B fails. However, this continues to fail indefinitely. Even if the driver learns about the demise of the B, A continues to try replicating to B and failing miserably. The reason behind this bug is that A initially fetches a list of peers from the driver (when B was active), but never updates it after B is dead. This affects Spark Streaming as its receiver uses block replication. The solution in this patch adds the following. - Changed BlockManagerMaster to return all the peers of a block manager, rather than the requested number. It also filters out driver BlockManager. - Refactored BlockManager's replication code to handle peer caching correctly. + The peer for replication is randomly selected. This is different from past behavior where for a node A, a node B was deterministically chosen for the lifetime of the application. + If replication fails to one node, the peers are refetched. + The peer cached has a TTL of 1 second to enable discovery of new peers and using them for replication. - Refactored use of \<driver\> in BlockManager into a new method `BlockManagerId.isDriver` - Added replication unit tests (replication was not tested till now, duh!) This should not make a difference in performance of Spark workloads where replication is not used. @andrewor14 @JoshRosen Author: Tathagata Das <[email protected]> Closes apache#2366 from tdas/replication-fix and squashes the following commits: 9690f57 [Tathagata Das] Moved replication tests to a new BlockManagerReplicationSuite. 0661773 [Tathagata Das] Minor changes based on PR comments. a55a65c [Tathagata Das] Added a unit test to test replication behavior. 012afa3 [Tathagata Das] Bug fix 89f91a0 [Tathagata Das] Minor change. 68e2c72 [Tathagata Das] Made replication peer selection logic more efficient. 08afaa9 [Tathagata Das] Made peer selection for replication deterministic to block id 3821ab9 [Tathagata Das] Fixes based on PR comments. 08e5646 [Tathagata Das] More minor changes. d402506 [Tathagata Das] Fixed imports. 4a20531 [Tathagata Das] Filtered driver block manager from peer list, and also consolidated the use of <driver> in BlockManager. 7598f91 [Tathagata Das] Minor changes. 03de02d [Tathagata Das] Change replication logic to correctly refetch peers from master on failure and on new worker addition. d081bf6 [Tathagata Das] Fixed bug in get peers and unit tests to test get-peers and replication under executor churn. 9f0ac9f [Tathagata Das] Modified replication tests to fail on replication bug. af0c1da [Tathagata Das] Added replication unit tests to BlockManagerSuite
1 parent c6469a0 commit 5db78e6

File tree

8 files changed

+544
-49
lines changed

8 files changed

+544
-49
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

+104-18
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
2222

2323
import scala.concurrent.ExecutionContext.Implicits.global
2424

25+
import scala.collection.mutable
2526
import scala.collection.mutable.{ArrayBuffer, HashMap}
2627
import scala.concurrent.{Await, Future}
2728
import scala.concurrent.duration._
@@ -112,6 +113,11 @@ private[spark] class BlockManager(
112113
private val broadcastCleaner = new MetadataCleaner(
113114
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
114115

116+
// Field related to peer block managers that are necessary for block replication
117+
@volatile private var cachedPeers: Seq[BlockManagerId] = _
118+
private val peerFetchLock = new Object
119+
private var lastPeerFetchTime = 0L
120+
115121
initialize()
116122

117123
/* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
@@ -787,31 +793,111 @@ private[spark] class BlockManager(
787793
}
788794

789795
/**
790-
* Replicate block to another node.
796+
* Get peer block managers in the system.
797+
*/
798+
private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
799+
peerFetchLock.synchronized {
800+
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
801+
val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
802+
if (cachedPeers == null || forceFetch || timeout) {
803+
cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
804+
lastPeerFetchTime = System.currentTimeMillis
805+
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
806+
}
807+
cachedPeers
808+
}
809+
}
810+
811+
/**
812+
* Replicate block to another node. Not that this is a blocking call that returns after
813+
* the block has been replicated.
791814
*/
792-
@volatile var cachedPeers: Seq[BlockManagerId] = null
793815
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
816+
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
817+
val numPeersToReplicateTo = level.replication - 1
818+
val peersForReplication = new ArrayBuffer[BlockManagerId]
819+
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
820+
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
794821
val tLevel = StorageLevel(
795822
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
796-
if (cachedPeers == null) {
797-
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
823+
val startTime = System.currentTimeMillis
824+
val random = new Random(blockId.hashCode)
825+
826+
var replicationFailed = false
827+
var failures = 0
828+
var done = false
829+
830+
// Get cached list of peers
831+
peersForReplication ++= getPeers(forceFetch = false)
832+
833+
// Get a random peer. Note that this selection of a peer is deterministic on the block id.
834+
// So assuming the list of peers does not change and no replication failures,
835+
// if there are multiple attempts in the same node to replicate the same block,
836+
// the same set of peers will be selected.
837+
def getRandomPeer(): Option[BlockManagerId] = {
838+
// If replication had failed, then force update the cached list of peers and remove the peers
839+
// that have been already used
840+
if (replicationFailed) {
841+
peersForReplication.clear()
842+
peersForReplication ++= getPeers(forceFetch = true)
843+
peersForReplication --= peersReplicatedTo
844+
peersForReplication --= peersFailedToReplicateTo
845+
}
846+
if (!peersForReplication.isEmpty) {
847+
Some(peersForReplication(random.nextInt(peersForReplication.size)))
848+
} else {
849+
None
850+
}
798851
}
799-
for (peer: BlockManagerId <- cachedPeers) {
800-
val start = System.nanoTime
801-
data.rewind()
802-
logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " +
803-
s"To node: $peer")
804852

805-
try {
806-
blockTransferService.uploadBlockSync(
807-
peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
808-
} catch {
809-
case e: Exception =>
810-
logError(s"Failed to replicate block to $peer", e)
853+
// One by one choose a random peer and try uploading the block to it
854+
// If replication fails (e.g., target peer is down), force the list of cached peers
855+
// to be re-fetched from driver and then pick another random peer for replication. Also
856+
// temporarily black list the peer for which replication failed.
857+
//
858+
// This selection of a peer and replication is continued in a loop until one of the
859+
// following 3 conditions is fulfilled:
860+
// (i) specified number of peers have been replicated to
861+
// (ii) too many failures in replicating to peers
862+
// (iii) no peer left to replicate to
863+
//
864+
while (!done) {
865+
getRandomPeer() match {
866+
case Some(peer) =>
867+
try {
868+
val onePeerStartTime = System.currentTimeMillis
869+
data.rewind()
870+
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
871+
blockTransferService.uploadBlockSync(
872+
peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
873+
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms"
874+
.format((System.currentTimeMillis - onePeerStartTime)))
875+
peersReplicatedTo += peer
876+
peersForReplication -= peer
877+
replicationFailed = false
878+
if (peersReplicatedTo.size == numPeersToReplicateTo) {
879+
done = true // specified number of peers have been replicated to
880+
}
881+
} catch {
882+
case e: Exception =>
883+
logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
884+
failures += 1
885+
replicationFailed = true
886+
peersFailedToReplicateTo += peer
887+
if (failures > maxReplicationFailures) { // too many failures in replcating to peers
888+
done = true
889+
}
890+
}
891+
case None => // no peer left to replicate to
892+
done = true
811893
}
812-
813-
logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."
814-
.format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
894+
}
895+
val timeTakeMs = (System.currentTimeMillis - startTime)
896+
logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
897+
s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
898+
if (peersReplicatedTo.size < numPeersToReplicateTo) {
899+
logWarning(s"Block $blockId replicated to only " +
900+
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
815901
}
816902
}
817903

core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala

+2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ class BlockManagerId private (
5959

6060
def port: Int = port_
6161

62+
def isDriver: Boolean = (executorId == "<driver>")
63+
6264
override def writeExternal(out: ObjectOutput) {
6365
out.writeUTF(executorId_)
6466
out.writeUTF(host_)

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

+2-7
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,8 @@ class BlockManagerMaster(
8484
}
8585

8686
/** Get ids of other nodes in the cluster from the driver */
87-
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
88-
val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
89-
if (result.length != numPeers) {
90-
throw new SparkException(
91-
"Error getting peers, only got " + result.size + " instead of " + numPeers)
92-
}
93-
result
87+
def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
88+
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
9489
}
9590

9691
/**

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

+13-16
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
8383
case GetLocationsMultipleBlockIds(blockIds) =>
8484
sender ! getLocationsMultipleBlockIds(blockIds)
8585

86-
case GetPeers(blockManagerId, size) =>
87-
sender ! getPeers(blockManagerId, size)
86+
case GetPeers(blockManagerId) =>
87+
sender ! getPeers(blockManagerId)
8888

8989
case GetMemoryStatus =>
9090
sender ! memoryStatus
@@ -173,11 +173,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
173173
* from the executors, but not from the driver.
174174
*/
175175
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
176-
// TODO: Consolidate usages of <driver>
177176
import context.dispatcher
178177
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
179178
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
180-
removeFromDriver || info.blockManagerId.executorId != "<driver>"
179+
removeFromDriver || !info.blockManagerId.isDriver
181180
}
182181
Future.sequence(
183182
requiredBlockManagers.map { bm =>
@@ -212,7 +211,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
212211
val minSeenTime = now - slaveTimeout
213212
val toRemove = new mutable.HashSet[BlockManagerId]
214213
for (info <- blockManagerInfo.values) {
215-
if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != "<driver>") {
214+
if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
216215
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
217216
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
218217
toRemove += info.blockManagerId
@@ -232,7 +231,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
232231
*/
233232
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
234233
if (!blockManagerInfo.contains(blockManagerId)) {
235-
blockManagerId.executorId == "<driver>" && !isLocal
234+
blockManagerId.isDriver && !isLocal
236235
} else {
237236
blockManagerInfo(blockManagerId).updateLastSeenMs()
238237
true
@@ -355,7 +354,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
355354
tachyonSize: Long) {
356355

357356
if (!blockManagerInfo.contains(blockManagerId)) {
358-
if (blockManagerId.executorId == "<driver>" && !isLocal) {
357+
if (blockManagerId.isDriver && !isLocal) {
359358
// We intentionally do not register the master (except in local mode),
360359
// so we should not indicate failure.
361360
sender ! true
@@ -403,16 +402,14 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
403402
blockIds.map(blockId => getLocations(blockId))
404403
}
405404

406-
private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
407-
val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
408-
409-
val selfIndex = peers.indexOf(blockManagerId)
410-
if (selfIndex == -1) {
411-
throw new SparkException("Self index for " + blockManagerId + " not found")
405+
/** Get the list of the peers of the given block manager */
406+
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
407+
val blockManagerIds = blockManagerInfo.keySet
408+
if (blockManagerIds.contains(blockManagerId)) {
409+
blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
410+
} else {
411+
Seq.empty
412412
}
413-
414-
// Note that this logic will select the same node multiple times if there aren't enough peers
415-
Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
416413
}
417414
}
418415

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ private[spark] object BlockManagerMessages {
8888

8989
case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster
9090

91-
case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
91+
case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
9292

9393
case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
9494

core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
132132
val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
133133
assert(statuses.size === 1)
134134
statuses.head match { case (bm, status) =>
135-
assert(bm.executorId === "<driver>", "Block should only be on the driver")
135+
assert(bm.isDriver, "Block should only be on the driver")
136136
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
137137
assert(status.memSize > 0, "Block should be in memory store on the driver")
138138
assert(status.diskSize === 0, "Block should not be in disk store on the driver")

0 commit comments

Comments
 (0)