Skip to content

Commit

Permalink
SPARKC-262: Set write consistency level to LOCAL_QUORUM.
Browse files Browse the repository at this point in the history
With the current default write consistency level of LOCAL_ONE,
it's relatively easy to overwhelm a cluster because it only waits
for one replica to ack the write before proceeding on.
Changing the default write consistency level to LOCAL_QUORUM means
that it will do a little more natural throttling to allow the cluster to keep up
with the writes for multiple replicas.
It seems like it would make for a more stable out of the box experience.
  • Loading branch information
pkolaczk committed Nov 30, 2015
1 parent 0c46644 commit b40c411
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Added support for tinyint and smallint types (SPARKC-269)
* Updated Java driver version to 3.0.0-alpha3.
* Changed the way CassandraConnectorSource is obtained due to SPARK-7171 (SPARKC-268)
* Change write ConsistencyLevel to LOCAL_QUORUM (SPARKC-262)

1.5.0 M2
* Bump Java Driver to 2.2.0-rc3, Guava to 16.0.1 and test against Cassandra 2.2.1 (SPARKC-229)
Expand Down
2 changes: 1 addition & 1 deletion doc/5_saving.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
cow | 60
The driver will execute a CQL `INSERT` statement for every object in the `RDD`,
grouped in unlogged batches. The consistency level for writes is `ONE`.
grouped in unlogged batches. The consistency level for writes is `LOCAL_QUORUM`.

It is possible to specify custom column to property mapping with `SomeColumns`. If the property
names in objects to be saved do not correspond to the column names in the destination table, use
Expand Down
2 changes: 1 addition & 1 deletion doc/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ in each row</td>
</tr>
<tr>
<td><code>output.consistency.level</code></td>
<td>LOCAL_ONE</td>
<td>LOCAL_QUORUM</td>
<td>Consistency level for writing</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ abstract class WritableToCassandra[T] {
* rdd.saveToCassandra("test", "words")
* }}}
*
* By default, writes are performed at ConsistencyLevel.ONE in order to leverage data-locality and minimize network traffic.
* By default, writes are performed at ConsistencyLevel.LOCAL_QUORUM.
* This write consistency level is controlled by the following property:
* - spark.cassandra.output.consistency.level: consistency level for RDD writes, string matching the ConsistencyLevel enum name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.apache.spark.SparkConf
* @param batchGroupingBufferSize the number of distinct batches that can be buffered before
* they are written to Cassandra
* @param batchGroupingKey which rows can be grouped into a single batch
* @param consistencyLevel consistency level for writes, default LOCAL_ONE
* @param consistencyLevel consistency level for writes, default LOCAL_QUORUM
* @param parallelismLevel number of batches to be written in parallel
* @param ttl the default TTL value which is used when it is defined (in seconds)
* @param timestamp the default timestamp value which is used when it is defined (in microseconds)
Expand Down Expand Up @@ -57,7 +57,7 @@ object WriteConf {
val ConsistencyLevelParam = ConfigParameter[ConsistencyLevel](
name = "spark.cassandra.output.consistency.level",
section = ReferenceSection,
default = ConsistencyLevel.LOCAL_ONE,
default = ConsistencyLevel.LOCAL_QUORUM,
description = """Consistency level for writing""")

val BatchSizeRowsParam = ConfigParameter[Option[Int]](
Expand Down Expand Up @@ -132,8 +132,6 @@ object WriteConf {
TaskMetricsParam
)



def fromSparkConf(conf: SparkConf): WriteConf = {

ConfigCheck.checkConfig(conf)
Expand Down

0 comments on commit b40c411

Please sign in to comment.