Skip to content

Commit 234de92

Browse files
committed
[SPARK-4028][Streaming] ReceivedBlockHandler interface to abstract the functionality of storage of received data
As part of the initiative to prevent data loss on streaming driver failure, this JIRA tracks the subtask of implementing a ReceivedBlockHandler, that abstracts the functionality of storage of received data blocks. The default implementation will maintain the current behavior of storing the data into BlockManager. The optional implementation will store the data to both BlockManager as well as a write ahead log. Author: Tathagata Das <[email protected]> Closes apache#2940 from tdas/driver-ha-rbh and squashes the following commits: 78a4aaa [Tathagata Das] Fixed bug causing test failures. f192f47 [Tathagata Das] Fixed import order. df5f320 [Tathagata Das] Updated code to use ReceivedBlockStoreResult as the return type for handler's storeBlock 33c30c9 [Tathagata Das] Added license, and organized imports. 2f025b3 [Tathagata Das] Updates based on PR comments. 18aec1e [Tathagata Das] Moved ReceivedBlockInfo back into spark.streaming.scheduler package 95a4987 [Tathagata Das] Added ReceivedBlockHandler and its associated tests
1 parent d932719 commit 234de92

11 files changed

+603
-70
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import scala.reflect.ClassTag
2323
import org.apache.spark.rdd.{BlockRDD, RDD}
2424
import org.apache.spark.storage.BlockId
2525
import org.apache.spark.streaming._
26-
import org.apache.spark.streaming.receiver.Receiver
26+
import org.apache.spark.streaming.receiver.{WriteAheadLogBasedStoreResult, BlockManagerBasedStoreResult, Receiver}
2727
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
28+
import org.apache.spark.SparkException
2829

2930
/**
3031
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -65,10 +66,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
6566
if (validTime >= graph.startTime) {
6667
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
6768
receivedBlockInfo(validTime) = blockInfo
68-
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
69+
val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
6970
Some(new BlockRDD[T](ssc.sc, blockIds))
7071
} else {
71-
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
72+
Some(new BlockRDD[T](ssc.sc, Array.empty))
7273
}
7374
}
7475

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.receiver
19+
20+
import java.nio.ByteBuffer
21+
22+
import scala.collection.mutable.ArrayBuffer
23+
import scala.language.existentials
24+
25+
/** Trait representing a received block */
26+
private[streaming] sealed trait ReceivedBlock
27+
28+
/** class representing a block received as an ArrayBuffer */
29+
private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock
30+
31+
/** class representing a block received as an Iterator */
32+
private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends ReceivedBlock
33+
34+
/** class representing a block received as an ByteBuffer */
35+
private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.receiver
19+
20+
import scala.concurrent.{Await, ExecutionContext, Future}
21+
import scala.concurrent.duration._
22+
import scala.language.{existentials, postfixOps}
23+
24+
import WriteAheadLogBasedBlockHandler._
25+
import org.apache.hadoop.conf.Configuration
26+
import org.apache.hadoop.fs.Path
27+
28+
import org.apache.spark.{Logging, SparkConf, SparkException}
29+
import org.apache.spark.storage._
30+
import org.apache.spark.streaming.util.{Clock, SystemClock, WriteAheadLogFileSegment, WriteAheadLogManager}
31+
import org.apache.spark.util.Utils
32+
33+
/** Trait that represents the metadata related to storage of blocks */
34+
private[streaming] trait ReceivedBlockStoreResult {
35+
def blockId: StreamBlockId // Any implementation of this trait will store a block id
36+
}
37+
38+
/** Trait that represents a class that handles the storage of blocks received by receiver */
39+
private[streaming] trait ReceivedBlockHandler {
40+
41+
/** Store a received block with the given block id and return related metadata */
42+
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult
43+
44+
/** Cleanup old blocks older than the given threshold time */
45+
def cleanupOldBlock(threshTime: Long)
46+
}
47+
48+
49+
/**
50+
* Implementation of [[org.apache.spark.streaming.receiver.ReceivedBlockStoreResult]]
51+
* that stores the metadata related to storage of blocks using
52+
* [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]]
53+
*/
54+
private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId)
55+
extends ReceivedBlockStoreResult
56+
57+
58+
/**
59+
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
60+
* stores the received blocks into a block manager with the specified storage level.
61+
*/
62+
private[streaming] class BlockManagerBasedBlockHandler(
63+
blockManager: BlockManager, storageLevel: StorageLevel)
64+
extends ReceivedBlockHandler with Logging {
65+
66+
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
67+
val putResult: Seq[(BlockId, BlockStatus)] = block match {
68+
case ArrayBufferBlock(arrayBuffer) =>
69+
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
70+
case IteratorBlock(iterator) =>
71+
blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
72+
case ByteBufferBlock(byteBuffer) =>
73+
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
74+
case o =>
75+
throw new SparkException(
76+
s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
77+
}
78+
if (!putResult.map { _._1 }.contains(blockId)) {
79+
throw new SparkException(
80+
s"Could not store $blockId to block manager with storage level $storageLevel")
81+
}
82+
BlockManagerBasedStoreResult(blockId)
83+
}
84+
85+
def cleanupOldBlock(threshTime: Long) {
86+
// this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing
87+
// of BlockRDDs.
88+
}
89+
}
90+
91+
92+
/**
93+
* Implementation of [[org.apache.spark.streaming.receiver.ReceivedBlockStoreResult]]
94+
* that stores the metadata related to storage of blocks using
95+
* [[org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler]]
96+
*/
97+
private[streaming] case class WriteAheadLogBasedStoreResult(
98+
blockId: StreamBlockId,
99+
segment: WriteAheadLogFileSegment
100+
) extends ReceivedBlockStoreResult
101+
102+
103+
/**
104+
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
105+
* stores the received blocks in both, a write ahead log and a block manager.
106+
*/
107+
private[streaming] class WriteAheadLogBasedBlockHandler(
108+
blockManager: BlockManager,
109+
streamId: Int,
110+
storageLevel: StorageLevel,
111+
conf: SparkConf,
112+
hadoopConf: Configuration,
113+
checkpointDir: String,
114+
clock: Clock = new SystemClock
115+
) extends ReceivedBlockHandler with Logging {
116+
117+
private val blockStoreTimeout = conf.getInt(
118+
"spark.streaming.receiver.blockStoreTimeout", 30).seconds
119+
private val rollingInterval = conf.getInt(
120+
"spark.streaming.receiver.writeAheadLog.rollingInterval", 60)
121+
private val maxFailures = conf.getInt(
122+
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)
123+
124+
// Manages rolling log files
125+
private val logManager = new WriteAheadLogManager(
126+
checkpointDirToLogDir(checkpointDir, streamId),
127+
hadoopConf, rollingInterval, maxFailures,
128+
callerName = this.getClass.getSimpleName,
129+
clock = clock
130+
)
131+
132+
// For processing futures used in parallel block storing into block manager and write ahead log
133+
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
134+
implicit private val executionContext = ExecutionContext.fromExecutorService(
135+
Utils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
136+
137+
/**
138+
* This implementation stores the block into the block manager as well as a write ahead log.
139+
* It does this in parallel, using Scala Futures, and returns only after the block has
140+
* been stored in both places.
141+
*/
142+
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
143+
144+
// Serialize the block so that it can be inserted into both
145+
val serializedBlock = block match {
146+
case ArrayBufferBlock(arrayBuffer) =>
147+
blockManager.dataSerialize(blockId, arrayBuffer.iterator)
148+
case IteratorBlock(iterator) =>
149+
blockManager.dataSerialize(blockId, iterator)
150+
case ByteBufferBlock(byteBuffer) =>
151+
byteBuffer
152+
case _ =>
153+
throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
154+
}
155+
156+
// Store the block in block manager
157+
val storeInBlockManagerFuture = Future {
158+
val putResult =
159+
blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true)
160+
if (!putResult.map { _._1 }.contains(blockId)) {
161+
throw new SparkException(
162+
s"Could not store $blockId to block manager with storage level $storageLevel")
163+
}
164+
}
165+
166+
// Store the block in write ahead log
167+
val storeInWriteAheadLogFuture = Future {
168+
logManager.writeToLog(serializedBlock)
169+
}
170+
171+
// Combine the futures, wait for both to complete, and return the write ahead log segment
172+
val combinedFuture = for {
173+
_ <- storeInBlockManagerFuture
174+
fileSegment <- storeInWriteAheadLogFuture
175+
} yield fileSegment
176+
val segment = Await.result(combinedFuture, blockStoreTimeout)
177+
WriteAheadLogBasedStoreResult(blockId, segment)
178+
}
179+
180+
def cleanupOldBlock(threshTime: Long) {
181+
logManager.cleanupOldLogs(threshTime)
182+
}
183+
184+
def stop() {
185+
logManager.stop()
186+
}
187+
}
188+
189+
private[streaming] object WriteAheadLogBasedBlockHandler {
190+
def checkpointDirToLogDir(checkpointDir: String, streamId: Int): String = {
191+
new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString
192+
}
193+
}

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

+51-37
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,13 @@ import scala.concurrent.Await
2525

2626
import akka.actor.{Actor, Props}
2727
import akka.pattern.ask
28-
2928
import com.google.common.base.Throwables
30-
31-
import org.apache.spark.{Logging, SparkEnv}
32-
import org.apache.spark.streaming.scheduler._
33-
import org.apache.spark.util.{Utils, AkkaUtils}
29+
import org.apache.hadoop.conf.Configuration
30+
import org.apache.spark.{Logging, SparkEnv, SparkException}
3431
import org.apache.spark.storage.StreamBlockId
35-
import org.apache.spark.streaming.scheduler.DeregisterReceiver
36-
import org.apache.spark.streaming.scheduler.AddBlock
37-
import org.apache.spark.streaming.scheduler.RegisterReceiver
32+
import org.apache.spark.streaming.scheduler._
33+
import org.apache.spark.streaming.util.WriteAheadLogFileSegment
34+
import org.apache.spark.util.{AkkaUtils, Utils}
3835

3936
/**
4037
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
@@ -44,12 +41,26 @@ import org.apache.spark.streaming.scheduler.RegisterReceiver
4441
*/
4542
private[streaming] class ReceiverSupervisorImpl(
4643
receiver: Receiver[_],
47-
env: SparkEnv
44+
env: SparkEnv,
45+
hadoopConf: Configuration,
46+
checkpointDirOption: Option[String]
4847
) extends ReceiverSupervisor(receiver, env.conf) with Logging {
4948

50-
private val blockManager = env.blockManager
49+
private val receivedBlockHandler: ReceivedBlockHandler = {
50+
if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
51+
if (checkpointDirOption.isEmpty) {
52+
throw new SparkException(
53+
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
54+
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
55+
"See documentation for more details.")
56+
}
57+
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
58+
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
59+
} else {
60+
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
61+
}
62+
}
5163

52-
private val storageLevel = receiver.storageLevel
5364

5465
/** Remote Akka actor for the ReceiverTracker */
5566
private val trackerActor = {
@@ -105,47 +116,50 @@ private[streaming] class ReceiverSupervisorImpl(
105116
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
106117
def pushArrayBuffer(
107118
arrayBuffer: ArrayBuffer[_],
108-
optionalMetadata: Option[Any],
109-
optionalBlockId: Option[StreamBlockId]
119+
metadataOption: Option[Any],
120+
blockIdOption: Option[StreamBlockId]
110121
) {
111-
val blockId = optionalBlockId.getOrElse(nextBlockId)
112-
val time = System.currentTimeMillis
113-
blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true)
114-
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
115-
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
122+
pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
116123
}
117124

118125
/** Store a iterator of received data as a data block into Spark's memory. */
119126
def pushIterator(
120127
iterator: Iterator[_],
121-
optionalMetadata: Option[Any],
122-
optionalBlockId: Option[StreamBlockId]
128+
metadataOption: Option[Any],
129+
blockIdOption: Option[StreamBlockId]
123130
) {
124-
val blockId = optionalBlockId.getOrElse(nextBlockId)
125-
val time = System.currentTimeMillis
126-
blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
127-
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
128-
reportPushedBlock(blockId, -1, optionalMetadata)
131+
pushAndReportBlock(IteratorBlock(iterator), metadataOption, blockIdOption)
129132
}
130133

131134
/** Store the bytes of received data as a data block into Spark's memory. */
132135
def pushBytes(
133136
bytes: ByteBuffer,
134-
optionalMetadata: Option[Any],
135-
optionalBlockId: Option[StreamBlockId]
137+
metadataOption: Option[Any],
138+
blockIdOption: Option[StreamBlockId]
136139
) {
137-
val blockId = optionalBlockId.getOrElse(nextBlockId)
138-
val time = System.currentTimeMillis
139-
blockManager.putBytes(blockId, bytes, storageLevel, tellMaster = true)
140-
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
141-
reportPushedBlock(blockId, -1, optionalMetadata)
140+
pushAndReportBlock(ByteBufferBlock(bytes), metadataOption, blockIdOption)
142141
}
143142

144-
/** Report pushed block */
145-
def reportPushedBlock(blockId: StreamBlockId, numRecords: Long, optionalMetadata: Option[Any]) {
146-
val blockInfo = ReceivedBlockInfo(streamId, blockId, numRecords, optionalMetadata.orNull)
147-
trackerActor ! AddBlock(blockInfo)
148-
logDebug("Reported block " + blockId)
143+
/** Store block and report it to driver */
144+
def pushAndReportBlock(
145+
receivedBlock: ReceivedBlock,
146+
metadataOption: Option[Any],
147+
blockIdOption: Option[StreamBlockId]
148+
) {
149+
val blockId = blockIdOption.getOrElse(nextBlockId)
150+
val numRecords = receivedBlock match {
151+
case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
152+
case _ => -1
153+
}
154+
155+
val time = System.currentTimeMillis
156+
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
157+
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
158+
159+
val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
160+
val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
161+
Await.result(future, askTimeout)
162+
logDebug(s"Reported block $blockId")
149163
}
150164

151165
/** Report error to the receiver tracker */

streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.spark.streaming.scheduler
1919

20-
import org.apache.spark.streaming.Time
2120
import org.apache.spark.annotation.DeveloperApi
21+
import org.apache.spark.streaming.Time
2222

2323
/**
2424
* :: DeveloperApi ::

0 commit comments

Comments
 (0)