Distributed. Columnar. Versioned. Streaming. SQL.
_______ __ ____ ____
/ ____(_) /___ / __ \/ __ )
/ /_ / / / __ \/ / / / __ |
/ __/ / / / /_/ / /_/ / /_/ /
/_/ /_/_/\____/_____/_____/
Columnar, versioned layers of data wrapped in a yummy high-performance analytical database engine.
Table of Contents generated with DocToc
- Run analytical queries up to 100x faster on Spark SQL and Cassandra.
- Pre-requisites
- Getting Started
- Current Status
- Roadmap
- Building and Testing
- You can help!
FiloDB is a new open-source distributed, versioned, and columnar analytical database designed for modern streaming workloads.
- Distributed - FiloDB is designed from the beginning to run on best-of-breed distributed, scale-out storage platforms such as Apache Cassandra. Queries run in parallel in Apache Spark for scale-out ad-hoc analysis.
- Columnar - FiloDB brings breakthrough performance levels for analytical queries by using a columnar storage layout with different space-saving techniques like dictionary compression. True columnar querying techniques are on the roadmap. The current performance is comparable to Parquet, and one to two orders of magnitude faster than Spark on Cassandra 2.x for analytical queries. For the POC performance comparison, please see cassandra-gdelt repo.
- Versioned - At the same time, row-level, column-level operations and built in versioning gives FiloDB far more flexibility than can be achieved using file-based technologies like Parquet alone.
- Designed for streaming - Enable easy exactly-once ingestion from Kafka for streaming events, time series, and IoT applications - yet enable extremely fast ad-hoc analysis using the ease of use of SQL. Each row is keyed by a partition and sort key, and writes using the same key are idempotent. FiloDB does the hard work of keeping data stored in an efficient and sorted format.
FiloDB is easy to use! You can use Spark SQL for both ingestion (including from Streaming!) and querying.
Connect Tableau or any other JDBC analysis tool to Spark SQL, and easily ingest data from any source with Spark support(JSON, CSV, traditional database, Kafka, etc.)
FiloDB is a great fit for bulk analytical workloads, or streaming / event data. It is not optimized for heavily transactional, update-oriented workflows.
Overview presentation -- see the docs folder for design docs.
To compile the .mermaid source files to .png's, install the Mermaid CLI.
Your input is appreciated!
- True columnar querying and execution, using late materialization and vectorization techniques
- Use of GPU and SIMD instructions to speed up queries
- Support for many more data types and sort and partition keys - please give us your input!
- Non-Spark ingestion API. Your input is again needed.
- In-memory caching for significant query speedup
- Projections. Often-repeated queries can be sped up significantly with projections.
- Java 8
- SBT
- Apache Cassandra (We prefer using CCM for local testing)
- Apache Spark (1.4.x) (Not strictly needed if you only use CLI, but you probably want to use Spark for queries)
-
Clone the project and cd into the project directory,
$ git clone https://github.com/velvia/FiloDB.git $ cd FiloDB
-
Start a Cassandra Cluster. If its not accessible at
localhost:9042
update it incore/src/main/resources/application.conf
. -
FiloDB can be used through
filo-cli
or as a Spark data source. The CLI supports data ingestion from CSV files only; the Spark data source is better tested and richer in features.
There are two crucial parts to a dataset in FiloDB,
- partitioning column - decides how data is going to be distributed across the cluster
- sort column - acts as a primary key within each partition and decides how data will be sorted within each partition. Like the "clustering key" from Cassandra.
The PRIMARY KEY for FiloDB consists of (partition key, sort key). When choosing the above values you must make sure the combination of the two are unique. This is very similar to Cassandra CQL Tables, whose primary key consists of (partition columns, clustering columns). The partition key in FiloDB maps to the Cassandra partition key, and sort key maps to the clustering key.
Specifying the partitioning column is optional. If a partitioning column is not specified, FiloDB will create a default one with a fixed value, which means everything will be thrown into one node, and is only suitable for small amounts of data. If you don't specify a partitioning column, then you have to make sure your sort keys are all unique.
This is one way I would recommend setting things up to take advantage of FiloDB.
The metric names are the column names. This lets you select on just one metric and effectively take advantage of columnar layout.
- Partition key = hostname
- Sort key = timestamp
- Columns: hostname, timestamp, CPU, load_avg, disk_usage, etc.
You can add more metrics/columns over time, but storing each metric in its own column is FAR FAR more efficient, at least in FiloDB. For example, disk usage metrics are likely to have very different numbers than load_avg, and so Filo can optimize the storage of each one independently. Right now I would store them as ints and longs if possible.
With the above layout, as long as there aren’t too many hostnames, set the memtable max size and flush trigger to both high numbers, you should get good read performance. Queries that would work well for the above layout:
- SELECT avg(load_avg), min(load_avg), max(load_avg) FROM metrics WHERE timestamp > t1 AND timestamp < t2 etc.
Queries that would work well once we expose a local Cassandra query interface:
- Select metrics from one individual host
Another possible layout is something like this:
Partition key = hostname % 1024 (or pick your # of shards) Sort key = hostname, timestamp
This will have to wait for the multiple-sort-key-column change of course.
The filo-cli
accepts arguments as key-value pairs. The following keys are supported:
key | purpose |
---|---|
dataset | It is required for all the operations. Its value should be the name of the dataset |
limit | This is optional key to be used with select . Its value should be the number of rows required. |
columns | This is required for defining the schema of a dataset. Its value should be a comma-separated string of the format, column1:typeOfColumn1,column2:typeOfColumn2 where column1 and column2 are the names of the columns and typeOfColumn1 and typeOfColumn2 are one of int ,long ,double , string |
sortColumn | This is required for defining the schema. Its value should be the name of the column on which the data is to be sorted. |
command | Its value can be either of init ,create ,importcsv or list .The init command is used to create the FiloDB schema.The create command is used to define new a dataset. For example,./filo-cli --command create --dataset playlist --columns id:int,album:string,artist:string,title:string --sortColumn id Note: The sort column is not optional. The list command can be used to view the schema of a dataset. For example, ./filo-cli --command list --dataset playlist The importcsv command can be used to load data from a CSV file into a dataset. For example,./filo-cli --command importcsv --dataset playlist --filename playlist.csv Note: The CSV file should be delimited with comma and have a header row. The column names must match those specified when creating the schema for that dataset. |
select | Its value should be a comma-separated string of the columns to be selected,./filo-cli --dataset playlist --select album,title The result from select is printed in the console by default. An output file can be specified with the key --outfile . For example,./filo-cli --dataset playlist --select album,title --outfile playlist.csv |
delimiter | This is optional key to be used with importcsv command. Its value should be the field delimiter character. Default value is comma. |
timeoutMinutes | The number of minutes to time out for CSV ingestion. Needs to be greater than the max amount of time for ingesting the whole file. Defaults to 99. |
The following examples use the GDELT public dataset and can be run from the project directory.
Create a dataset with all the columns :
./filo-cli --command create --dataset gdelt --columns GLOBALEVENTID:int,SQLDATE:string,MonthYear:int,Year:int,FractionDate:double,Actor1Code:string,Actor1Name:string,Actor1CountryCode:string,Actor1KnownGroupCode:string,Actor1EthnicCode:string,Actor1Religion1Code:string,Actor1Religion2Code:string,Actor1Type1Code:string,Actor1Type2Code:string,Actor1Type3Code:string,Actor2Code:string,Actor2Name:string,Actor2CountryCode:string,Actor2KnownGroupCode:string,Actor2EthnicCode:string,Actor2Religion1Code:string,Actor2Religion2Code:string,Actor2Type1Code:string,Actor2Type2Code:string,Actor2Type3Code:string,IsRootEvent:int,EventCode:string,EventBaseCode:string,EventRootCode:string,QuadClass:int,GoldsteinScale:double,NumMentions:int,NumSources:int,NumArticles:int,AvgTone:double,Actor1Geo_Type:int,Actor1Geo_FullName:string,Actor1Geo_CountryCode:string,Actor1Geo_ADM1Code:string,Actor1Geo_Lat:double,Actor1Geo_Long:double,Actor1Geo_FeatureID:int,Actor2Geo_Type:int,Actor2Geo_FullName:string,Actor2Geo_CountryCode:string,Actor2Geo_ADM1Code:string,Actor2Geo_Lat:double,Actor2Geo_Long:double,Actor2Geo_FeatureID:int,ActionGeo_Type:int,ActionGeo_FullName:string,ActionGeo_CountryCode:string,ActionGeo_ADM1Code:string,ActionGeo_Lat:double,ActionGeo_Long:double,ActionGeo_FeatureID:int,DATEADDED:string,Actor1Geo_FullLocation:string,Actor2Geo_FullLocation:string,ActionGeo_FullLocation:string --sortColumn GLOBALEVENTID
Verify the dataset metadata:
./filo-cli --command list --dataset gdelt
Import data from a CSV file:
./filo-cli --command importcsv --dataset gdelt --filename GDELT-1979-1984-100000.csv
Query/export some columns:
./filo-cli --dataset gdelt --select MonthYear,Actor2Code --limit 5 --outfile out.csv
FiloDB has a Spark data-source module - filodb.spark
. So, you can use the Spark Dataframes read
and write
APIs with FiloDB. To use it, follow the steps below
- Start Cassandra and update project configuration if required.
- From the FiloDB project directory, execute,
$ sbt clean $ ./filo-cli --command init $ sbt spark/assembly
- Use the jar
FiloDB/spark/target/scala-2.10/filodb-spark-assembly-0.1-SNAPSHOT.jar
with Spark 1.4.x.
The options to use with the data-source api are:
option | value | command | optional |
---|---|---|---|
dataset | name of the dataset | read/write | No |
sort_column | name of the column according to which the data should be sorted | write | No |
partition_column | name of the column according to which data should be partitioned | write | Yes |
splits_per_node | number of read threads per node, defaults to 1 | read | Yes |
default_partition_key | default value to use for the partition key if the partition_column has a null value. If not specified, an error is thrown. Note that this only has an effect if the dataset is created for the first time. | write | Yes |
version | numeric version of data to write, defaults to 0 | write | Yes |
You can follow along using the Spark Notebook in doc/FiloDB.snb.... launch the notebook using EXTRA_CLASSPATH=$FILO_JAR ADD_JARS=$FILO_JAR ./bin/spark-notebook &
where FILO_JAR
is the path to filodb-spark-assembly
jar.
Or you can start a spark-shell locally,
bin/spark-shell --jars ../FiloDB/spark/target/scala-2.10/filodb-spark-assembly-0.1-SNAPSHOT.jar --packages com.databricks:spark-csv_2.10:1.2.0 --driver-memory 3G --executor-memory 3G
Loading CSV file from Spark:
scala> val csvDF = sqlContext.read.format("com.databricks.spark.csv").
option("header", "true").option("inferSchema", "true").
load("../FiloDB/GDELT-1979-1984-100000.csv")
Creating a dataset from a Spark DataFrame,
scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
scala> csvDF.write.format("filodb.spark").
option("dataset", "gdelt").
option("sort_column", "GLOBALEVENTID").
mode(SaveMode.Overwrite).save()
Or, specifying the partition_column
,
scala> csvDF.write.format("filodb.spark").
option("dataset", "gdelt").
option("sort_column", "GLOBALEVENTID").
option("partition_column", "MonthYear").
mode(SaveMode.Overwrite).save()
Note that for efficient columnar encoding, wide rows with fewer partition keys are better for performance.
Reading the dataset,
val df = sqlContext.read.format("filodb.spark").option("dataset", "gdelt").load()
The dataset can be queried using the DataFrame DSL. See the section Querying Datasets for examples.
Start Spark-SQL:
bin/spark-sql --jars path/to/FiloDB/spark/target/scala-2.10/filodb-spark-assembly-0.1-SNAPSHOT.jar
Create a temporary table using an existing dataset,
create temporary table gdelt
using filodb.spark
options (
dataset "gdelt"
);
Then, start running SQL queries!
Create a table using the method saveAsFiloDataset
:
scala> import filodb.spark._
import filodb.spark._
scala> sqlContext.saveAsFiloDataset(myDF, "table1", sortCol, partCol, createDataset=true)
Read using filoDataset
:
scala> val df = sqlContext.filoDataset("gdelt")
15/06/04 15:21:41 INFO DCAwareRoundRobinPolicy: Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
15/06/04 15:21:41 INFO Cluster: New Cassandra host localhost/127.0.0.1:9042 added
15/06/04 15:21:41 INFO FiloRelation: Read schema for dataset gdelt = Map(ActionGeo_CountryCode -> Column(ActionGeo_CountryCode,gdelt,0,StringColumn,FiloSerializer,false,false), Actor1Geo_FullName -> Column(Actor1Geo_FullName,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2Name -> Column(Actor2Name,gdelt,0,StringColumn,FiloSerializer,false,false), ActionGeo_ADM1Code -> Column(ActionGeo_ADM1Code,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2CountryCode -> Column(Actor2CountryCode,gdelt,0,StringColumn,FiloSerializer,fals...
The dataset can be queried using the DataFrame DSL. See the section Querying Datasets for examples.
Now do some queries, using the DataFrame DSL:
scala> df.select(count(df("MonthYear"))).show()
...<skipping lots of logging>...
COUNT(MonthYear)
4037998
or SQL, to find the top 15 events with the highest tone:
scala> df.registerTempTable("gdelt")
scala> sqlContext.sql("SELECT Actor1Name, Actor2Name, AvgTone FROM gdelt ORDER BY AvgTone DESC LIMIT 15").collect()
res13: Array[org.apache.spark.sql.Row] = Array([208077.29634561483])
Now, how about something uniquely Spark .. feed SQL query results to MLLib to compute a correlation:
scala> import org.apache.spark.mllib.stat.Statistics
scala> val numMentions = df.select("NumMentions").map(row => row.getInt(0).toDouble)
numMentions: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[100] at map at DataFrame.scala:848
scala> val numArticles = df.select("NumArticles").map(row => row.getInt(0).toDouble)
numArticles: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[104] at map at DataFrame.scala:848
scala> val correlation = Statistics.corr(numMentions, numArticles, "pearson")
- The storage format is subject to change at this time.
- Only ingestion through Spark / Spark Streaming, and CLI ingestion via CSV files.
- Only string, Int, Long partition keys and Long/Timestamp/Int/Double sort keys are supported, but many more to come
- CSV export from CLI will only read data from one node of a cluster.
Run the tests with sbt test
, or for continuous development, sbt ~test
. Noisy cassandra logs can be seen in filodb-test.log
.
- Send your use cases for OLAP on Cassandra and Spark
- Especially IoT and Geospatial
- Email if you want to contribute
Your feedback will help decide the next batch of features, such as: - which data types to add support for - what architecture is best supported