Skip to content

Commit

Permalink
SPARKC-122: Documentation updates for 1.2.0 release
Browse files Browse the repository at this point in the history
- Fix a few examples to match current connector behavior
- Minor feature clarifications
- Minor text/grammer/spelling edits
- Fix links between pages
  • Loading branch information
Brian Cantoni committed Apr 22, 2015
1 parent 0b1d582 commit 6319fbf
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 42 deletions.
4 changes: 2 additions & 2 deletions doc/0_quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ and [Spark documentation](https://spark.apache.org/docs/0.9.1/).

### Prerequisites

Install and launch a Cassandra 2.0 cluster and a Spark cluster.
Install and launch a Cassandra cluster and a Spark cluster.

Configure a new Scala project with the following dependencies:

- Apache Spark 0.9 or 1.0 and its dependencies
- Apache Spark and its dependencies
- Apache Cassandra thrift and clientutil libraries matching the version of Cassandra
- DataStax Cassandra driver for your Cassandra version

Expand Down
4 changes: 3 additions & 1 deletion doc/10_embedded.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ See: [https://github.com/datastax/spark-cassandra-connector/tree/master/spark-ca
## How To Add The Dependency
Simply add this to your SBT build, or in the appropriate format for a Maven build

"com.datastax.spark" %% "spark-cassandra-connector-embedded" % {latest.verson}
"com.datastax.spark" %% "spark-cassandra-connector-embedded" % {latest.verson}

[Next - Performance Monitoring](11_metrics.md)
4 changes: 2 additions & 2 deletions doc/11_metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ read-task-timer | Timer to measure time of reading a single partition

### Compatibility
Codahale based metrics should work with either Spark 1.1.x or Spark 1.2.x. However task metrics
works only with Spark 1.2.x. Therefore, if this version of Spark Cassandra Connector is to be used with
work only with Spark 1.2.x. Therefore, if this version of Spark Cassandra Connector is to be used with
Spark 1.1.x, task based metrics have to be disabled.

[Next - Building And Artifacts](doc/12_building_and_artifacts.md)
[Next - Building And Artifacts](12_building_and_artifacts.md)
4 changes: 3 additions & 1 deletion doc/12_building_and_artifacts.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ For Scala 2.11 tasks:
sbt -Dscala-2.11=true package
sbt -Dscala-2.11=true assembly

**Note:** The Spark Java API is currently not buildable with Scala 2.11; see [SPARKC-130](https://datastax-oss.atlassian.net/browse/SPARKC-130).

#### Scala 2.10
To use Scala 2.10 nothing extra is required.

Expand Down Expand Up @@ -82,4 +84,4 @@ Then add this jar to your Spark executor classpath by adding the following line
spark.executor.extraClassPath spark-cassandra-connector/spark-cassandra-connector/target/scala-{binary.version}/spark-cassandra-connector-assembly-$CurrentVersion-SNAPSHOT.jar

This driver is also compatible with Spark distribution provided in
[DataStax Enterprise 4.5](http://www.datastax.com/documentation/datastax_enterprise/4.5/datastax_enterprise/newFeatures.html).
[DataStax Enterprise](http://datastax.com/docs/latest-dse/).
2 changes: 1 addition & 1 deletion doc/2_loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ and a Cassandra Table can be preformed without doing a full table scan. . When p
between two Cassandra Tables which share the same partition key this will *not* require movement of data between machines.
In all cases this method will use the source RDD's partitioning and placement for data locality.

`joinWithCassandraTable` is not effected by `cassandra.input.split.size` since partitions are automatically inherited from
`joinWithCassandraTable` is not affected by `cassandra.input.split.size` since partitions are automatically inherited from
the source RDD. The other input properties have their normal effects.

####Join between two Cassandra Tables Sharing a Partition Key
Expand Down
10 changes: 8 additions & 2 deletions doc/3_selection.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ partition which is created for the RDD.

### Grouping rows by partition key

Physically, cassandra stores data already grouped by Cassandra partition key and ordered by clustering
Physically, Cassandra stores data already grouped by partition key and ordered by clustering
column(s) within each partition. As a single Cassandra partition never spans multiple Spark partitions,
it is possible to very efficiently group data by partition key without shuffling data around.
Call `spanBy` or `spanByKey` methods instead of `groupBy` or `groupByKey`:

```sql
CREATE TABLE events (year int, month int, ts timestamp, data varchar);
CREATE TABLE events (year int, month int, ts timestamp, data varchar, PRIMARY KEY (year,month,ts));
```

```scala
Expand All @@ -94,6 +94,12 @@ sc.cassandraTable("test", "events")
.spanByKey
```

Note: This only works for sequentially ordered data. Because data is ordered in Cassandra by the
clustering keys, all viable spans must follow the natural clustering key order.

This means in the above example that `spanBy` will be possible on (year), (year,month),
(year,month,ts) but not (month), (ts), or (month,ts).

The methods `spanBy` and `spanByKey` iterate every Spark partition locally
and put every RDD item into the same group as long as the key doesn't change.
Whenever the key changes, a new group is started. You need enough memory
Expand Down
9 changes: 5 additions & 4 deletions doc/4_mapper.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ table to the objects of class with fields `word: String` and `count: Int`:

```scala
case class WordCount(word: String, count: Int)
val result = sc.cassandraTable[WordCount]]("test", "words").select("word", "num" as "count").collect()
val result = sc.cassandraTable[WordCount]("test", "words").select("word", "num" as "count").collect()
```

The `as` method can be used for any type of projected value: normal column, TTL or write time:
Expand Down Expand Up @@ -103,14 +103,15 @@ INSERT INTO test.users (user_name, domain, password_hash, last_visit) VALUES ('j
We can access map the rows of this table into pair in the following ways:

```scala
import org.joda.time.DateTime
case class UserId(userName: String, domain: String)
case class UserData(passwordHash: String, lastVisit: DateTime)

sc.cassandraTable[KV[UserId, UserData]]("test", "users")
sc.cassandraTable[(UserId, UserData)]("test", "users")

sc.cassandraTable[KV[(String, String), UserData]]("test", "users")
sc.cassandraTable[((String, String), UserData)]("test", "users")

sc.cassandraTable[KV[(String, String), (String, DateTime)]]("test", "users")
sc.cassandraTable[((String, String), (String, DateTime))]("test", "users")
```

[Next - Saving data](5_saving.md)
29 changes: 20 additions & 9 deletions doc/5_saving.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ To save an `RDD` to a new table, instead of calling `saveToCassandra`, call `sav

## Saving a collection of tuples

Assume the following table definition:
```sql
CREATE TABLE test.words (word text PRIMARY KEY, count int);
```

```scala
val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
Expand All @@ -35,7 +40,8 @@ collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
## Saving a collection of objects
When saving a collection of objects of a user-defined class, the items to be saved
must provide appropriately named public property accessors for getting every column
to be saved. This example provides more information on property-column naming conventions is described [here](4_mapper.md).
to be saved. This example provides more information on property-column naming
conventions as described [here](4_mapper.md).

```scala
case class WordCount(word: String, count: Long)
Expand All @@ -58,16 +64,17 @@ The driver will execute a CQL `INSERT` statement for every object in the `RDD`,
grouped in unlogged batches. The consistency level for writes is `ONE`.

It is possible to specify custom column to property mapping with `SomeColumns`. If the property
names in objects, which are supposed to be saved, do not correspond to the column names in the
destination table, use `as` method on the column names which you want to override the mapping for.
names in objects to be saved do not correspond to the column names in the destination table, use
the `as` method on the column names you want to override. The parameter order is table column
name first, then object property name.

Example:
Say you want to save `WordCount` objects to the table which has column `word TEXT` and `num INT`.
Say you want to save `WordCount` objects to the table which has columns `word TEXT` and `num INT`.

```scala
case class WordCount(word: String, count: Long)
collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveToCassandra("test", "words2", SomeColumns("word", "count" as "num"))
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveToCassandra("test", "words2", SomeColumns("word", "num" as "count"))
```

## Saving objects of Cassandra User Defined Types
Expand All @@ -83,8 +90,9 @@ CREATE TABLE test.companies (name text PRIMARY KEY, address FROZEN<address>);

To create a new row in the `test.companies` table:
```scala
import com.datastax.spark.connector.UDTValue
case class Company(name: String, address: UDTValue)
val address = UDTValue.fromMap("city" -> "Santa Clara", "street" -> "Freedom Circle", number -> 3975)
val address = UDTValue.fromMap(Map("city" -> "Santa Clara", "street" -> "Freedom Circle", "number" -> 3975))
val company = Company("DataStax", address)
sc.parallelize(Seq(company)).saveToCassandra("test", "companies")
```
Expand All @@ -102,12 +110,15 @@ collection.saveAsCassandraTable("test", "words_new", SomeColumns("word", "count"
```

To customize the table definition, call `saveAsCassandraTableEx`. The following example
demonstrates how to add another column of int type to the table definition:
demonstrates how to add another column of int type to the table definition, creating new
table `words_new_2`:

```scala
import com.datastax.spark.connector.cql.{ColumnDef, RegularColumn, TableDef}
import com.datastax.spark.connector.types.IntType
case class WordCount(word: String, count: Long)
val table1 = TableDef.fromType[WordCount]("test", "words_new")
val table2 = TableDef("test", "words_new", table1.partitionKey, table1.clusteringColumns,
val table2 = TableDef("test", "words_new_2", table1.partitionKey, table1.clusteringColumns,
table1.regularColumns :+ ColumnDef("additional_column", RegularColumn, IntType))
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveAsCassandraTableEx(table2, SomeColumns("word", "count"))
Expand Down
4 changes: 3 additions & 1 deletion doc/8_streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,6 @@ Start the computation:
For a more detailed description as well as tuning writes, see [Saving Data to Cassandra](5_saving.md).

### Find out more
http://spark.apache.org/docs/latest/streaming-programming-guide.html
http://spark.apache.org/docs/latest/streaming-programming-guide.html

[Next - Demos](9_demos.md)
39 changes: 20 additions & 19 deletions doc/9_demos.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ and [Java](https://github.com/datastax/spark-cassandra-connector/tree/master/spa
Most of the above functionality is covered in the Java API demo samples.

## Requirements

### Start Cassandra
Running a demo requires a local Cassandra instance to be running. This can be one node or a cluster.

All Scala demos create the Cassandra keyspaces and tables for you, however the Java demos do not.

In order to run the Java Demos, you will need to create the following keyspace, table and secondary index in Cassandra via cqlsh:
If you don't already have it, download the latest Apache Cassandra binaries, un-tar, and start Cassandra by invoking:

$CASSANDRA_HOME/bin/cassandra -f'

### Cassandra Keyspace and Tables
All Scala demos create the Cassandra keyspaces and tables for you, however the Java demos do not. In order to run the Java Demos, you will need to create the following keyspace, table and secondary index in Cassandra via `cqlsh`:

CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
CREATE TABLE test.people (id INT, name TEXT, birth_date TIMESTAMP, PRIMARY KEY (id));
CREATE INDEX people_name_idx ON test.people(name);

### Start Cassandra
If you don't already have it, download the latest Apache Cassandra binaries, un-tar, and start Cassandra by invoking

$CASSANDRA_HOME/bin/cassandra -f'

## Running Demos

### Settings
Expand All @@ -43,7 +43,7 @@ The basic demos (WordCountDemo, BasicReadWriteDemo, SQLDemo, AkkaStreamingDemo,
The Kafka streaming demo sets `spark.master` as 127.0.0.1 or `local[n]`, and `spark.cassandra.connection.host` as 127.0.0.1. Change this locally if desired.

#### Twitter Streaming Demo
The Twitter streaming demo accepts java system properties passed in on the command line when invoking `sbt run`.
The Twitter streaming demo accepts Java system properties passed in on the command line when invoking `sbt run`.
Default configurations are set in the /resources/application.conf file as fallbacks.
One could run like this, for instance:

Expand Down Expand Up @@ -71,15 +71,15 @@ And then select which demo you want:

Multiple main classes detected, select one to run:

[1] com.datastax.spark.connector.demo.WordCountDemo
[2] com.datastax.spark.connector.demo.TableCopyDemo
[3] com.datastax.spark.connector.demo.AkkaStreamingDemo
[4] com.datastax.spark.connector.demo.JavaApiDemo
[5] com.datastax.spark.connector.demo.BasicReadWriteDemo
[6] com.datastax.spark.connector.demo.SQLDemo
[1] com.datastax.spark.connector.demo.AkkaStreamingDemo
[2] com.datastax.spark.connector.demo.BasicReadWriteDemo
[3] com.datastax.spark.connector.demo.JavaApiDemo
[4] com.datastax.spark.connector.demo.SQLDemo
[5] com.datastax.spark.connector.demo.TableCopyDemo
[6] com.datastax.spark.connector.demo.WordCountDemo

#### Running The Kafka Streaming Demo
Spark does not support kafka streaming or publish the `spark-streaming-kafka`
Spark does not support Kafka streaming or publish the `spark-streaming-kafka`
artifact in their Scala 2.11 build yet. Until then this is only available against Scala 2.10.
On the command line at the root of `spark-cassandra-connector`:

Expand All @@ -88,7 +88,7 @@ On the command line at the root of `spark-cassandra-connector`:
#### Running The Twitter Streaming Demo
First you need to set your Twitter auth credentials. This is required by Twitter.
The Twitter streaming sample expects these values to either already exist in the
deploy environment, and if not found, falls back to acquire from java system properties.
deploy environment, and if not found, falls back to acquire from Java system properties.

##### Twitter Authentication
To set Twitter credentials in your deploy environment:
Expand Down Expand Up @@ -121,13 +121,14 @@ Start a standalone master server by executing:
./sbin/start-master.sh

Once started, the master will print out a spark://HOST:PORT URL for itself, which you can use to connect workers
to it, or pass as the master argument to SparkContext. You can also find this URL on the masters web UI,
to it, or pass as the "master" argument to SparkContext. You can also find this URL on the master's web UI,
which is http://localhost:8080 by default.

Start one or more workers and connect them to the master via:

./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT

Once you have started a worker, look at the masters web UI (http://localhost:8080 by default).
Once you have started a worker, look at the master's web UI (http://localhost:8080 by default).
You should see the new node listed there, along with its number of CPUs and memory (minus one gigabyte left for the OS).

[Next - Embedded Connector](10_embedded.md)

0 comments on commit 6319fbf

Please sign in to comment.