Skip to content

Commit

Permalink
[MINOR] Add license headers and section titles to component README.md (
Browse files Browse the repository at this point in the history
  • Loading branch information
lresende authored Jan 19, 2020
1 parent 4f22586 commit 3912360
Show file tree
Hide file tree
Showing 13 changed files with 322 additions and 93 deletions.
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
<!--
{% comment %}
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
{% endcomment %}
-->
# Apache Bahir

Apache Bahir provides extensions to distributed analytics platforms such as Apache Spark & Apache Flink.
Expand All @@ -8,7 +26,7 @@ Apache Bahir provides extensions to distributed analytics platforms such as Apac

The Initial Bahir source code (see issue [BAHIR-1](https://issues.apache.org/jira/browse/BAHIR-1)) containing the source for the Apache Spark streaming connectors for akka, mqtt, twitter, zeromq
extracted from [Apache Spark revision 8301fad](https://github.com/apache/spark/tree/8301fadd8d269da11e72870b7a889596e3337839)
(before the [deletion of the streaming connectors akka, mqtt, twitter, zeromq](https://issues.apache.org/jira/browse/SPARK-13843)).
(before the [deletion of the streaming connectors akka, mqtt, twitter, zeromq](https://issues.apache.org/jira/browse/SPARK-13843)).

## Source code structure

Expand Down
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@
<exclude>.project</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
<exclude>**/target/**</exclude>
<exclude>**/README.md</exclude>
<exclude>**/examples/data/*.txt</exclude>
<exclude>**/*.iml</exclude>
<exclude>**/src/main/resources/application.conf</exclude>
Expand Down
94 changes: 57 additions & 37 deletions sql-cloudant/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
A library for reading data from Cloudant or CouchDB databases using Spark SQL and Spark Streaming.

[IBM® Cloudant®](https://cloudant.com) is a document-oriented DataBase as a Service (DBaaS). It stores data as documents
in JSON format. It's built with scalability, high availability, and durability in mind. It comes with a
wide variety of indexing options including map-reduce, Cloudant Query, full-text indexing, and
geospatial indexing. The replication capabilities make it easy to keep data in sync between database
<!--
{% comment %}
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
{% endcomment %}
-->
# Apache CouchDB/Cloudant Data Source, Streaming Connector and SQL Streaming Data Source

A library for reading data from Cloudant or CouchDB databases using Spark SQL and Spark Streaming.

[IBM® Cloudant®](https://cloudant.com) is a document-oriented DataBase as a Service (DBaaS). It stores data as documents
in JSON format. It's built with scalability, high availability, and durability in mind. It comes with a
wide variety of indexing options including map-reduce, Cloudant Query, full-text indexing, and
geospatial indexing. The replication capabilities make it easy to keep data in sync between database
clusters, desktop PCs, and mobile devices.

[Apache CouchDB™](http://couchdb.apache.org) is open source database software that focuses on ease of use and having an architecture that "completely embraces the Web". It has a document-oriented NoSQL database architecture and is implemented in the concurrency-oriented language Erlang; it uses JSON to store data, JavaScript as its query language using MapReduce, and HTTP for an API.
Expand All @@ -30,16 +50,16 @@ Unlike using `--jars`, using `--packages` ensures that this library and its depe
The `--packages` argument can also be used with `bin/spark-submit`.

Submit a job in Python:

spark-submit --master local[4] --packages org.apache.bahir:spark-sql-cloudant__{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}} <path to python script>

Submit a job in Scala:

spark-submit --class "<your class>" --master local[4] --packages org.apache.bahir:spark-sql-cloudant__{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}} <path to spark-sql-cloudant jar>

This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.

## Configuration options
## Configuration options
The configuration is obtained in the following sequence:

1. default in the Config, which is set in the application.conf
Expand All @@ -64,7 +84,7 @@ cloudant.host| |cloudant host url
cloudant.username| |cloudant userid
cloudant.password| |cloudant password
cloudant.numberOfRetries|3| number of times to replay a request that received a 429 `Too Many Requests` response
cloudant.useQuery|false|by default, `_all_docs` endpoint is used if configuration 'view' and 'index' (see below) are not set. When useQuery is enabled, `_find` endpoint will be used in place of `_all_docs` when query condition is not on primary key field (_id), so that query predicates may be driven into datastore.
cloudant.useQuery|false|by default, `_all_docs` endpoint is used if configuration 'view' and 'index' (see below) are not set. When useQuery is enabled, `_find` endpoint will be used in place of `_all_docs` when query condition is not on primary key field (_id), so that query predicates may be driven into datastore.
cloudant.queryLimit|25|the maximum number of results returned when querying the `_find` endpoint.
cloudant.storageLevel|MEMORY_ONLY|the storage level for persisting Spark RDDs during load when `cloudant.endpoint` is set to `_changes`. See [RDD Persistence section](https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) in Spark's Progamming Guide for all available storage level options.
cloudant.timeout|60000|stop the response after waiting the defined number of milliseconds for data. Only supported with `changes` endpoint.
Expand All @@ -74,12 +94,12 @@ jsonstore.rdd.minInPartition|10|the min rows in a partition.
jsonstore.rdd.requestTimeout|900000|the request timeout in milliseconds
bulkSize|200|the bulk save size
schemaSampleSize|-1|the sample size for RDD schema discovery. 1 means we are using only the first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs. Only supported with `_all_docs` endpoint.
createDBOnSave|false|whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.
createDBOnSave|false|whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.

The `cloudant.endpoint` option sets ` _changes` or `_all_docs` API endpoint to be called while loading Cloudant data into Spark DataFrames or SQL Tables.

**Note:** When using `_changes` API, please consider:
1. Results are partially ordered and may not be be presented in order in
**Note:** When using `_changes` API, please consider:
1. Results are partially ordered and may not be be presented in order in
which documents were updated.
2. In case of shards' unavailability, you may see duplicate results (changes that have been seen already)
3. Can use `selector` option to filter Cloudant docs during load
Expand All @@ -90,31 +110,31 @@ which documents were updated.
When using `_all_docs` API:
1. Supports parallel reads (using offset and range) and partitioning.
2. Using partitions may not represent the true snapshot of a database. Some docs
may be added or deleted in the database between loading data into different
may be added or deleted in the database between loading data into different
Spark partitions.

If loading Cloudant docs from a database greater than 100 MB, set `cloudant.endpoint` to `_changes` and `spark.streaming.unpersist` to `false`.
This will enable RDD persistence during load against `_changes` endpoint and allow the persisted RDDs to be accessible after streaming completes.
See [CloudantChangesDFSuite](src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala)

See [CloudantChangesDFSuite](src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala)
for examples of loading data into a Spark DataFrame with `_changes` API.

### Configuration on Spark SQL Temporary Table or DataFrame

Besides all the configurations passed to a temporary table or dataframe through SparkConf, it is also possible to set the following configurations in temporary table or dataframe using OPTIONS:
Besides all the configurations passed to a temporary table or dataframe through SparkConf, it is also possible to set the following configurations in temporary table or dataframe using OPTIONS:

Name | Default | Meaning
--- |:---:| ---
bulkSize|200| the bulk save size
createDBOnSave|false| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.
createDBOnSave|false| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.
database| | Cloudant database name
index| | Cloudant Search index without the database name. Search index queries are limited to returning 200 results so can only be used to load data with <= 200 results.
path| | Cloudant: as database name if database is not present
schemaSampleSize|-1| the sample size used to discover the schema for this temp table. -1 scans all documents
selector|all documents| a selector written in Cloudant Query syntax, specifying conditions for selecting documents when the `cloudant.endpoint` option is set to `_changes`. Only documents satisfying the selector's conditions will be retrieved from Cloudant and loaded into Spark.
view| | Cloudant view w/o the database name. only used for load.

For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compount field. An example of loading data from a view:
For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compount field. An example of loading data from a view:

```python
spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')")
Expand All @@ -140,8 +160,8 @@ The above stated configuration keys can also be set using `spark-submit --conf`

### Python API

#### Using SQL In Python
#### Using SQL In Python

```python
spark = SparkSession\
.builder\
Expand All @@ -168,7 +188,7 @@ Submit job example:
spark-submit --packages org.apache.bahir:spark-sql-cloudant_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}} --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD sql-cloudant/examples/python/CloudantApp.py
```

#### Using DataFrame In Python
#### Using DataFrame In Python

```python
spark = SparkSession\
Expand All @@ -182,17 +202,17 @@ spark = SparkSession\

# ***1. Loading dataframe from Cloudant db
df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
df.cache()
df.cache()
df.printSchema()
df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show()
df.filter(df._id >= 'CAA').select("_id",'airportName').show()
```

See [CloudantDF.py](examples/python/CloudantDF.py) for examples.

In case of doing multiple operations on a dataframe (select, filter etc.),
you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again.
Persisting will also speed up computation. This statement will persist an RDD in memory: `df.cache()`. Alternatively for large dbs to persist in memory & disk, use:
Persisting will also speed up computation. This statement will persist an RDD in memory: `df.cache()`. Alternatively for large dbs to persist in memory & disk, use:

```python
from pyspark import StorageLevel
Expand All @@ -203,7 +223,7 @@ df.persist(storageLevel = StorageLevel(True, True, False, True, 1))

### Scala API

#### Using SQL In Scala
#### Using SQL In Scala

```scala
val spark = SparkSession
Expand All @@ -216,7 +236,7 @@ val spark = SparkSession

// For implicit conversions of Dataframe to RDDs
import spark.implicits._

// create a temp table from Cloudant db and query it using sql syntax
spark.sql(
s"""
Expand All @@ -238,7 +258,7 @@ Submit job example:
spark-submit --class org.apache.spark.examples.sql.cloudant.CloudantApp --packages org.apache.bahir:spark-sql-cloudant_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}} --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD /path/to/spark-sql-cloudant_{{site.SCALA_BINARY_VERSION}}-{{site.SPARK_VERSION}}-tests.jar
```

### Using DataFrame In Scala
### Using DataFrame In Scala

```scala
val spark = SparkSession
Expand All @@ -250,12 +270,12 @@ val spark = SparkSession
.config("createDBOnSave","true") // to create a db on save
.config("jsonstore.rdd.partitions", "20") // using 20 partitions
.getOrCreate()

// 1. Loading data from Cloudant db
val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
// Caching df in memory to speed computations
// and not to retrieve data from cloudant again
df.cache()
df.cache()
df.printSchema()

// 2. Saving dataframe to Cloudant db
Expand All @@ -266,11 +286,11 @@ df2.write.format("org.apache.bahir.cloudant").save("n_flight2")
```

See [CloudantDF.scala](examples/scala/src/main/scala/mytest/spark/CloudantDF.scala) for examples.

[Sample code](examples/scala/src/main/scala/mytest/spark/CloudantDFOption.scala) on using DataFrame option to define Cloudant configuration.
### Using Streams In Scala


### Using Streams In Scala

```scala
val ssc = new StreamingContext(sparkConf, Seconds(10))
Expand All @@ -297,13 +317,13 @@ ssc.start()
// run streaming for 120 secs
Thread.sleep(120000L)
ssc.stop(true)

```

See [CloudantStreaming.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreaming.scala) for examples.

By default, Spark Streaming will load all documents from a database. If you want to limit the loading to
specific documents, use `selector` option of `CloudantReceiver` and specify your conditions
By default, Spark Streaming will load all documents from a database. If you want to limit the loading to
specific documents, use `selector` option of `CloudantReceiver` and specify your conditions
(See [CloudantStreamingSelector.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreamingSelector.scala)
example for more details):

Expand Down
Loading

0 comments on commit 3912360

Please sign in to comment.