Skip to content

Commit

Permalink
add more details to paper
Browse files Browse the repository at this point in the history
  • Loading branch information
fjy committed Oct 19, 2015
1 parent d8a1dd8 commit 89785b2
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 98 deletions.
2 changes: 1 addition & 1 deletion docs/content/design/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Druid currently allows for single-table queries in a similar manner to [Dremel](

As far as a comparison of systems is concerned, Druid sits in between PowerDrill and Dremel on the spectrum of functionality. It implements almost everything Dremel offers (Dremel handles arbitrary nested data structures while Druid only allows for a single level of array-based nesting) and gets into some of the interesting data layout and compression methods from PowerDrill.

Druid is a good fit for products that require real-time data ingestion of a single, large data stream. Especially if you are targeting no-downtime operation and are building your product on top of a time-oriented summarization of the incoming data stream. Druid is probably not the right solution if you care more about query flexibility and raw data access than query speed and no-downtime operation. When talking about query speed it is important to clarify what "fast" means: with Druid it is entirely within the realm of possibility (we have done it) to achieve queries that run in single-digit seconds across a 6TB data set.
Druid is a good fit for products that require real-time data ingestion of a single, large data stream. Especially if you are targeting no-downtime operation and are building your product on top of a time-oriented summarization of the incoming data stream. Druid is probably not the right solution if you care more about query flexibility and raw data access than query speed and no-downtime operation. When talking about query speed it is important to clarify what "fast" means: with Druid it is entirely within the realm of possibility (we have done it) to achieve queries that run in less than a second across terabytes of data.

### Architecture

Expand Down
Binary file added publications/radstack/figures/imps_clicks.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added publications/radstack/figures/joined.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added publications/radstack/figures/pipeline.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added publications/radstack/figures/shuffled.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified publications/radstack/radstack.pdf
Binary file not shown.
226 changes: 129 additions & 97 deletions publications/radstack/radstack.tex
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@
% 1st. author
\alignauthor
Fangjin Yang\\
\affaddr{Metamarkets Group, Inc.}\\
\email{[email protected]}
\affaddr{Imply Data, Inc.}\\
\email{[email protected]}
% 2nd. author
\alignauthor
Gian Merlino\\
\affaddr{Metamarkets Group, Inc.}\\
\email{gian@metamarkets.com}
\affaddr{Imply Data, Inc.}\\
\email{gian@imply.io}
% 3rd. author
\alignauthor
Xavier Léauté\\
Expand All @@ -85,7 +85,7 @@
% in the \additional authors block, viz.
\additionalauthors{Nishant Bangarwa,
({\texttt{[email protected]}}), Eric Tschetter
({\texttt{echeddar@gmail.com}})}
({\texttt{cheddar@yahoo-inc.com}})}
% Just remember to make sure that the TOTAL number of authors
% is the number that will appear on the first page PLUS the
% number that will appear in the \additionalauthors section.
Expand Down Expand Up @@ -284,7 +284,7 @@ \section{Background}
common with using key/value stores, we precomputed the total set of queries we
anticipated users would make. An example of this precomputation is shown in
Figure~\ref{fig:precompute}. Our results with HBase are shown in
Table~\ref{tab:hbase_results} for 500, 000 records.
Table~\ref{tab:hbase_results} for 500,000 records.
Queries were acceptably fast with this solution as we were effectively doing
O(1) lookups into maps. However, the solution was not particularly flexible;
if something wasn't precomputed, it wasn’t queryable, and the precomputation
Expand All @@ -309,7 +309,7 @@ \section{Background}
as multi-stream joins. Pairing Druid with a stream processor enabled flexible
data processing and querying, but we still had problems with event delivery.
Our events were delivered from many different locations and sources, and peaked
at several hundred thousand events per second. We required a high throughput
at several million events per second. We required a high throughput
message bus that could hold these events for consumpation by our stream
processor. To simplify data transmission for our clients, we wanted the
message bus to be the single delivery endpoint for events entering our cluster.
Expand Down Expand Up @@ -425,16 +425,16 @@ \subsection{Filters}
bieberfever.com and ultratrimfast.com?”.

Consider the publisher column in Table~\ref{tab:sample_data}, a string column.
For each unique publisher in Table 1, our inverted index tells us in which
table rows a particular page is seen. Our inverted index looks like the
following:
For each unique publisher in Table 1, we can generate an inverted index that
tells us in which table rows a particular page is seen. Our inverted index
looks like the following:

{\small\begin{verbatim}
bieberfever.com -> rows [0, 1, 2] -> [1][1][1][0][0][0]
ultratrimfast.com -> rows [3, 4, 5] -> [0][0][0][1][1][1]
\end{verbatim}}

In the binary array, the array indices represent our rows, and the array values
In the inverted index, the array indices represent our rows, and the array values
indicate whether a particular value was seen. In our example, bieberfever.com
is seen in rows 0, 1 and 2. To know which rows contain bieberfever.com or
ultratrimfast.com, we can OR together the two arrays.
Expand Down Expand Up @@ -483,7 +483,7 @@ \subsection{Streaming Data Ingestion}

Real-time nodes employ a log structured merge tree\cite{o1996log} for recently
ingested data. Incoming events are first stored in an in-memory buffer. The
in-memory buffer is directly queryable and Druid behaves as a row store for
in-memory buffer is directly queryable and Druid behaves as a key/value store for
queries on events that exist in this JVM heap-based store. The in-memory buffer
is heavily write optimized, and given that Druid is really designed for heavy
concurrent reads, events do not remain in the in-memory buffer for very long.
Expand Down Expand Up @@ -567,7 +567,7 @@ \subsection{Hand off}
processing are versioned by the start of the segment granularity interval.

\subsection{Batch Data Ingestion}
The core component used by real-time ingestion is an index that can be
The core component used by real-time ingestion is a hash map that can be
incrementally populated and finalized to create an immutable segment. This core
component is shared across both real-time and batch ingestion. Druid has built
in support for creating segments by leveraging Hadoop and running MapReduce
Expand Down Expand Up @@ -643,41 +643,6 @@ \subsection{Unifying Views}
will be flagged in this timeline. When the coordinator notices overshadowed
segments, it tells historical nodes to drop these segments from the cluster.

\subsection{Queries}
Druid has its own query language and accepts queries as POST requests. Broker,
historical, and real-time nodes all share the same query API.

The body of the POST request is a JSON object containing key/value pairs
specifying various query parameters. A typical query will contain the data
source name, the granularity of the result data, time range of interest, the
type of request, and the metrics to aggregate over. The result will also be a
JSON object containing the aggregated metrics over the time period.

Most query types will also support a filter set. A filter set is a Boolean
expression of dimension name and value pairs. Any number and combination of
dimensions and values may be specified. When a filter set is provided, only the
subset of the data that pertains to the filter set will be scanned. The ability
to handle complex nested filter sets is what enables Druid to drill into data
at any depth.

Druid supports many types of aggregations including sums on floating-point and
integer types, minimums, maximums, and complex aggregations such as cardinality
estimation and approximate quantile estimation. The results of aggregations can
be combined in mathematical expressions to form other aggregations. It is
beyond the scope of this paper to fully describe the query API but more
information can be found
online\footnote{\href{http://druid.io/docs/latest/Querying.html}{http://druid.io/docs/latest/Querying.html}}.

As of this writing, a join query for Druid is not yet implemented. Although
Druid’s storage format would allow for the implementation of joins (there is no
loss of fidelity for columns included as dimensions), implementation time is
costly and a strong enough use case has not yet arisen. When all sides of the
join are significantly large tables (> 1 billion records), materializing the
pre-join streams requires complex distributed memory management. The complexity
of the memory management is only amplified by the fact that we are targeting
highly concurrent, multi-tenant workloads. For these reasons, we’ve elected to
do joins at the processing layer for the time being.

\section{The Processing Layer}
\label{sec:processing}
Although Druid can ingest events that are streamed in one at a time, data must
Expand Down Expand Up @@ -707,20 +672,87 @@ \subsection{Stream Processing}
and empty strings, and filtering data. One pipeline may write to many data
sources in Druid.

Given that Druid does not support joins in queries, we require supporting joins
at the data processing level. Our approach to do streaming joins is to buffer
events for a configurable period of time. If an event arrives in the system
with a join key that exists in the buffer, the join occurs and the joined event
is transmitted further down the pipeline. If events are substantially delayed
and do not arrive in the allocated window period, they will not be joined. In
practice, this generally leads to one “primary” event continuing through the
pipeline and other secondary events with the same join key getting dropped.
This means that our stream processing layer is not guaranteed to deliver 100\%
accurate results. Furthermore, even without this restriction, Samza does not
offer exactly-once processing semantics. Problems in network connectivity or
node failure can lead to duplicated events. For these reasons, we run a
separate batch pipeline that generates a more accurate transformation of the
ingested data.
To understand a real-world pipeline, let's consider an example from online
advertising. In online advertising, events are generated by impressions (views)
of an ad and clicks of an ad. Many advertisers are interested in knowing how
many impressions of an ad converted into clicks. Impression streams and click
streams are often recorded as separate streams by ad servers and need to be
joined. An example of data generated by these two event streams is shown in
Figure~\ref{fig:imps_clicks}. Every event has a unique id or key that
identifies the ad served. We use this id as our join key.

\begin{figure}
\centering
\includegraphics[width = 2.6in]{imps_clicks}
\caption{
Ad impressions
and clicks are recorded in two separate streams. An event we want to join is
located in two different Kafka partitions on two different topics.
}
\label{fig:imps_clicks}
\end{figure}

Given that Druid does not support joins in queries, we need to do this join at
the data processing level. Our approach to do streaming joins is to buffer
events for a configurable period of time. We can leverage any key/value
database as the buffer, although we prefer one that has relatively high read
and write throughput. Events are typically buffered for 30 minutes. Once an
event arrives in the system with a join key that exists in the buffer, we
perform the first operation in our data pipeline: shuffling. A shuffle
operation writes events from our impressions and clicks streams to Kafka such
that the events that need to be joined are written to the same Kafka partition.
This is shown in Figure~\ref{fig:shuffled}.

\begin{figure}
\centering
\includegraphics[width = 2.6in]{shuffled}
\caption{
A shuffle operation ensures events to be joined at stored in the same Kafka
partition.
}
\label{fig:shuffled}
\end{figure}

The next stage in the data pipeline is to actually join the impression and
click. This is done by creating a new field in the data, called "is\_clicked".
This field is marked as "true" if a successful join occurs. This is shown in
Figure~\ref{fig:joined}

\begin{figure}
\centering
\includegraphics[width = 2.6in]{joined}
\caption{
The join operation adds a new field, "is\_clicked".
}
\label{fig:joined}
\end{figure}

The final stage of our data processing is to enhance the data. This stage
cleans up faults in data, and performs lookups and transforms of events. Once
data is cleaned, it is ready to be delivered to Druid for queries. The total
streaming data processing pipeline is shown in Figure~\ref{fig:pipeline}.

\begin{figure}
\centering
\includegraphics[width = 2.6in]{pipeline}
\caption{
The streaming processing data pipeline.
}
\label{fig:pipeline}
\end{figure}

The system we have designed is not perfect. Because we are doing windowed joins
and because events cannot be buffered indefinitely, not all joins are
guaranteed to complete. If events are substantially delayed and do not arrive
in the allocated window period, they will not be joined. In practice, this
generally leads to one “primary” event continuing through the pipeline and
other secondary events with the same join key getting dropped. This means that
our stream processing layer is not guaranteed to deliver 100\% accurate
results. Furthermore, even without this restriction, Samza does not offer
exactly-once processing semantics. Problems in network connectivity or node
failure can lead to duplicated events. For these reasons, we run a separate
batch pipeline that generates a more accurate transformation of the ingested
data.

The final job of our processing pipeline is to deliver data to Druid. For high
availability, processed events from Samza are transmitted concurrently to two
Expand Down Expand Up @@ -830,22 +862,33 @@ \subsection{Query Performance in Production}
selected 8 of our most queried data sources, described in
Table~\ref{tab:datasources}.

\begin{table}
\centering
\scriptsize\begin{tabular}{| l | l | l |}
\hline
\textbf{Data Source} & \textbf{Dimensions} & \textbf{Metrics} \\ \hline
\texttt{a} & 25 & 21 \\ \hline
\texttt{b} & 30 & 26 \\ \hline
\texttt{c} & 71 & 35 \\ \hline
\texttt{d} & 60 & 19 \\ \hline
\texttt{e} & 29 & 8 \\ \hline
\texttt{f} & 30 & 16 \\ \hline
\texttt{g} & 26 & 18 \\ \hline
\texttt{h} & 78 & 14 \\ \hline
\end{tabular}
\normalsize
\caption{Characteristics of production data sources.}
\label{tab:datasources}
\end{table}

Approximately 30\% of queries are standard aggregates involving different types
of metrics and filters, 60\% of queries are ordered group bys over one or more
dimensions with aggregates, and 10\% of queries are search queries and metadata
retrieval queries. The number of columns scanned in aggregate queries roughly
follows an exponential distribution. Queries involving a single column are very
frequent, and queries involving all columns are very rare.

\begin{figure}
\centering
\includegraphics[width = 2.3in]{avg_query_latency}
\includegraphics[width = 2.3in]{query_percentiles}
\caption{Query latencies of production data sources.}
\label{fig:query_latency}
\end{figure}
\begin{itemize}[leftmargin=*,beginpenalty=5000,topsep=0pt]

\item There were
approximately 50 total data sources in this particular cluster and several hundred users issuing
queries.
Expand All @@ -862,25 +905,6 @@ \subsection{Query Performance in Production}
memory map the data instead of loading it into the Java heap.)
\end{itemize}

\begin{table}
\centering
\scriptsize\begin{tabular}{| l | l | l |}
\hline
\textbf{Data Source} & \textbf{Dimensions} & \textbf{Metrics} \\ \hline
\texttt{a} & 25 & 21 \\ \hline
\texttt{b} & 30 & 26 \\ \hline
\texttt{c} & 71 & 35 \\ \hline
\texttt{d} & 60 & 19 \\ \hline
\texttt{e} & 29 & 8 \\ \hline
\texttt{f} & 30 & 16 \\ \hline
\texttt{g} & 26 & 18 \\ \hline
\texttt{h} & 78 & 14 \\ \hline
\end{tabular}
\normalsize
\caption{Characteristics of production data sources.}
\label{tab:datasources}
\end{table}

Query latencies are shown in Figure~\ref{fig:query_latency} and the queries per
minute are shown in Figure~\ref{fig:queries_per_min}. Across all the various
data sources, average query latency is approximately 550 milliseconds, with
Expand All @@ -890,6 +914,14 @@ \subsection{Query Performance in Production}
the broker nodes were compounded by very high query load on one of our
largest data sources.

\begin{figure}
\centering
\includegraphics[width = 2.3in]{avg_query_latency}
\includegraphics[width = 2.3in]{query_percentiles}
\caption{Query latencies of production data sources.}
\label{fig:query_latency}
\end{figure}

\begin{figure}
\centering
\includegraphics[width = 2.8in]{queries_per_min}
Expand All @@ -909,6 +941,14 @@ \subsection{Query Benchmarks on TPC-H Data}
In this case, queries requiring a substantial amount of work at the broker
level do not parallelize as well.

\begin{figure}
\centering
\includegraphics[width = 2.3in]{tpch_scaling}
\includegraphics[width = 2.3in]{tpch_scaling_factor}
\caption{Druid scaling benchmarks -- 100GB TPC-H data.}
\label{fig:tpch_scaling}
\end{figure}

Our Druid setup used Amazon EC2 \texttt{m3.2xlarge} instance types
(Intel\textsuperscript{\textregistered} Xeon\textsuperscript{\textregistered}
E5-2680 v2 @ 2.80GHz) for historical nodes and \texttt{c3.2xlarge} instances
Expand All @@ -920,14 +960,6 @@ \subsection{Query Benchmarks on TPC-H Data}
time interval and 36,246,530 rows/second/core for a \texttt{select sum(float)}
type query.

\begin{figure}
\centering
\includegraphics[width = 2.3in]{tpch_scaling}
\includegraphics[width = 2.3in]{tpch_scaling_factor}
\caption{Druid scaling benchmarks -- 100GB TPC-H data.}
\label{fig:tpch_scaling}
\end{figure}

\subsection{Data Ingestion Performance}
\begin{table}
\centering
Expand Down

0 comments on commit 89785b2

Please sign in to comment.