-
Notifications
You must be signed in to change notification settings - Fork 70
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Flink Stream Processing demo on DC/OS 2.0 (#89)
Small updates required for the Flink stream processing demo running on DC/OS 2.0.
- Loading branch information
Showing
31 changed files
with
830 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
# Fast Data: Financial Transaction Processing with Apache Flink | ||
|
||
During this demo we use [Apache Flink](https://flink.apache.org) and [Apache Kafka](https://kafka.apache.org) to setup a high-volume financial transactions pipeline. | ||
|
||
- Estimated time for completion: | ||
- Manual install: 15 minutes | ||
- Target audience: Anyone interested in stream data processing and analytics with Apache Kafka and Apache Flink. | ||
|
||
A video of this demo can be found [here](https://www.youtube.com/watch?v=bwPXNlVHTeI). | ||
|
||
**Table of Contents**: | ||
|
||
- [Architecture](#architecture) | ||
- [Prerequisites](#prerequisites) | ||
- [Install](#install) | ||
- [Use the demo](#use) | ||
|
||
## Architecture | ||
|
||
 | ||
|
||
This demo implements a data processing infrastructure that is able to spot money laundering. In the context of money laundering, we want to detect amounts larger than $10.000 transferred between two accounts, even if that amount is split into many small batches. See also [US](https://www.fincen.gov/history-anti-money-laundering-laws) and [EU](http://eur-lex.europa.eu/legal-content/EN/TXT/?uri=CELEX%3A32015L0849) legislation and regulations on this topic for more information. | ||
|
||
The architecture follows more or less the [SMACK stack architecture](https://d2iq.com/blog/smack-stack-new-lamp-stack): | ||
|
||
- Events: Event are being generated by a small [generator](https://github.com/dcos/demos/blob/master/flink/2.0/generator/generator.go). The events are in the form 'Sunday, 23-Jul-17 01:06:47 UTC;66;26;7810', where the first field '23-Jul-17 01:06:47 UTC' represents the (increasing) timestamp of transactions; the second field '66' represent the sender account; the third field the receiver account; and the fourth field represents the dollar amount transferred during that transaction. | ||
- Ingestion: The generated events are being ingested and buffered by a Kafka queue with the default topic 'transactions'. | ||
- Stream Processing: As we require fast response times, we use Apache Flink as a Stream processor running the [FinancialTransactionJob](https://github.com/dcos/demos/tree/master/flink/2.0/flink-job/src/main/java/io/dcos). | ||
- Storage: Here we diverge a bit from the typical SMACK stack setup and don't write the results into a Datastore such as Apache Cassandra. Instead we write the results again into a Kafka Stream (default: 'fraud'). Note, that Kafka also offers data persistence for all unprocessed events. | ||
- Actor: In order to view the results we use again a small [Golang viewer](https://github.com/dcos/demos/blob/master/flink/2.0/actor/actor_viewer.go) which simply reads and displays the results from the output Kafka stream. | ||
|
||
## Prerequisites | ||
|
||
- A running [DC/OS 2.0](https://dcos.io/releases/) or higher cluster with at least 3 private agents and 1 public agent. Each agent should have 2 CPUs and 5 GB of RAM available. The [DC/OS CLI](https://docs.d2iq.com/mesosphere/dcos/2.0/cli/) also needs to be installed. | ||
- The [dcos/demo](https://github.com/dcos/demos/) Git repo must be available locally, use: `git clone https://github.com/dcos/demos/` if you haven't done so yet. | ||
- [SSH](https://docs.d2iq.com/mesosphere/dcos/2.0/administering-clusters/sshcluster/) cluster access must be set up. | ||
|
||
The DC/OS services used in the demo are as follows: | ||
|
||
- Apache Kafka | ||
- Apache Flink | ||
|
||
## Install | ||
|
||
#### Kafka | ||
|
||
Install the Apache Kafka package : | ||
|
||
```bash | ||
dcos package install kafka | ||
``` | ||
|
||
Note that if you are unfamiliar with Kafka and its terminology, you can check out the respective [101 example](https://github.com/dcos/examples/tree/master/kafka). | ||
|
||
Next, figure out where the broker is: | ||
|
||
```bash | ||
$ dcos kafka endpoints broker | ||
{ | ||
"address": [ | ||
"10.0.2.64:1025", | ||
"10.0.2.83:1025", | ||
"10.0.0.161:1025" | ||
], | ||
"dns": [ | ||
"kafka-0-broker.kafka.autoip.dcos.thisdcos.directory:1025", | ||
"kafka-1-broker.kafka.autoip.dcos.thisdcos.directory:1025", | ||
"kafka-2-broker.kafka.autoip.dcos.thisdcos.directory:1025" | ||
], | ||
"vip": "broker.kafka.l4lb.thisdcos.directory:9092" | ||
} | ||
``` | ||
|
||
Note the FQDN for the vip, in our case `broker.kafka.l4lb.thisdcos.directory:9092`, which is independent of the actual broker locations. | ||
It is possible to use the FQDN of any of the brokers, but using the VIP FQDN will give us load balancing. | ||
|
||
##### Create Kafka Topics | ||
|
||
Fortunately, creating a topic is very simple using the DC/OS Kafka CLI. If you have installed Kafka from the UI you might have to install the cli extensions using `dcos package install kafka --cli`. If you installed Kafka as above using the CLI then it will automatically install the CLI extensions. | ||
|
||
We need two Kafka topics, one with the generated transactions and one for fraudulent transactions, which we can create with: | ||
|
||
``` bash | ||
dcos kafka topic create transactions | ||
``` | ||
|
||
and: | ||
|
||
``` bash | ||
dcos kafka topic create fraud | ||
``` | ||
|
||
### Generator | ||
|
||
After the Kafka queues are created, we can now deploy the [data generator](https://github.com/dcos/demos/blob/master/flink/2.0/generator/generator.go) with this [Marathon app definition](https://github.com/dcos/demos/blob/master/flink/2.0/generator/generator.json). As our 'generator' is written in Go and contains all its dependencies, the app definition doesn't require a container image and is running without Docker. | ||
|
||
``` bash | ||
dcos marathon app add https://raw.githubusercontent.com/dcos/demos/master/flink/2.0/generator/generator.json | ||
``` | ||
|
||
### Flink | ||
|
||
Finally, we can deploy [Apache Flink](https://github.com/dcos/examples/tree/master/flink/2.0) : | ||
|
||
```bash | ||
dcos package install flink | ||
``` | ||
|
||
At this point we have all of the required elements installed - Kafka, Flink and the data generator. We are now ready to start the demo. | ||
|
||
### Final View | ||
After install your DC/OS UI should look as follows: | ||
|
||
 | ||
|
||
## Use | ||
|
||
The core piece of this demo is the [FinancialTransactionJob](https://github.com/dcos/demos/tree/master/flink/2.0/flink-job/src/main/java/io/dcos) which we will submit to Flink. | ||
|
||
First we need to upload the [jar file](https://downloads.mesosphere.com/dcos-demo/flink/flink-job-1.0.jar) into Flink. Please note that the jar file is too large to be included in this github repo, but can be downloaded [here](https://downloads.mesosphere.com/dcos-demo/flink/flink-job-1.0.jar). | ||
|
||
In the Services tab of the DCOS UI, hover over the name of the flink service, and click on the link which appears to the right of it. This will open the Flink web UI in a new tab. | ||
|
||
 | ||
|
||
In the Flink web UI, click on Submit New Job, then click the Add New button. This will allow you to select the jar file from $DEMO_HOME and upload it. | ||
|
||
 | ||
|
||
Once we hit Submit, we should see the job begin to run in the Flink web UI. | ||
|
||
 | ||
|
||
### Viewing Output | ||
|
||
Now once the Flink job is running, we only need a way to visualize the results. We do that with another [simple program](https://github.com/dcos/demos/blob/master/flink/2.0/actor/actor_viewer.go): | ||
|
||
``` bash | ||
dcos marathon app add https://raw.githubusercontent.com/dcos/demos/master/flink/2.0/actor/fraudDisplay.json | ||
``` | ||
|
||
We can easily check the output by checking the task output from the UI. | ||
|
||
 | ||
|
||
Should you have any questions or suggestions concerning the demo, please raise an issue either in GitHub or via our [Jira](https://jira.mesosphere.com/), or let us know using the [[email protected]](mailto:[email protected]) mailing list or [community Slack](https://chat.dcos.io). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
FROM scratch | ||
ADD fraudDisplay-linux / | ||
CMD ["/fraudDisplay-linux"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
build: | ||
docker run --rm -v $(PWD):/usr/src/actor -w /usr/src/actor \ | ||
golang:1.12.5 \ | ||
go get -t -v github.com/Shopify/sarama && \ | ||
GOOS=linux GOARCH=386 CGO_ENABLED=0 \ | ||
go build -ldflags="-s -w" -o fraudDisplay-linux | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
|
||
package main | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"os/signal" | ||
|
||
"github.com/Shopify/sarama" | ||
) | ||
|
||
func main() { | ||
|
||
config := sarama.NewConfig() | ||
config.Consumer.Return.Errors = true | ||
|
||
// Specify brokers address. This is default one | ||
brokers := []string{"broker.kafka.l4lb.thisdcos.directory:9092"} | ||
|
||
// Create new consumer | ||
master, err := sarama.NewConsumer(brokers, config) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
defer func() { | ||
if err := master.Close(); err != nil { | ||
panic(err) | ||
} | ||
}() | ||
|
||
topic := "fraud" | ||
consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
signals := make(chan os.Signal, 1) | ||
signal.Notify(signals, os.Interrupt) | ||
|
||
// Count how many message processed | ||
msgCount := 0 | ||
|
||
// Get signnal for finish | ||
doneCh := make(chan struct{}) | ||
go func() { | ||
for { | ||
select { | ||
case err := <-consumer.Errors(): | ||
fmt.Println(err) | ||
case msg := <-consumer.Messages(): | ||
msgCount++ | ||
fmt.Println() | ||
fmt.Println("Detected Fraud: ", string(msg.Key), string(msg.Value)) | ||
case <-signals: | ||
fmt.Println("Interrupt is detected") | ||
doneCh <- struct{}{} | ||
} | ||
} | ||
}() | ||
|
||
<-doneCh | ||
fmt.Println("Processed", msgCount, "messages") | ||
} |
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
{ | ||
"id": "display", | ||
"instances": 1, | ||
"cpus": 0.1, | ||
"mem": 32, | ||
"fetch": [ | ||
{ | ||
"uri": "https://raw.githubusercontent.com/dcos/demos/master/flink/1.13/actor/fraudDisplay-linux" | ||
} | ||
], | ||
"cmd": "chmod u+x fraudDisplay-linux && ./fraudDisplay-linux" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
target/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<groupId>io.dcos</groupId> | ||
<artifactId>flink-job</artifactId> | ||
<version>1.0</version> | ||
<build> | ||
<plugins> | ||
<plugin> | ||
<artifactId>maven-compiler-plugin</artifactId> | ||
<configuration> | ||
<source>1.7</source> | ||
<target>1.7</target> | ||
</configuration> | ||
</plugin> | ||
<plugin> | ||
<artifactId>maven-shade-plugin</artifactId> | ||
<executions> | ||
<execution> | ||
<phase>package</phase> | ||
<goals> | ||
<goal>shade</goal> | ||
</goals> | ||
<configuration> | ||
<transformers> | ||
<transformer> | ||
<manifestEntries> | ||
<Main-Class>io.dcos.FinancialTransactionJob</Main-Class> | ||
</manifestEntries> | ||
</transformer> | ||
</transformers> | ||
</configuration> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
</project> | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>io.dcos</groupId> | ||
<artifactId>flink-job</artifactId> | ||
<version>1.0</version> | ||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-compiler-plugin</artifactId> | ||
<configuration> | ||
<source>1.7</source> | ||
<target>1.7</target> | ||
</configuration> | ||
</plugin> | ||
<!--<plugin>--> | ||
<!--<groupId>org.apache.maven.plugins</groupId>--> | ||
<!--<artifactId>maven-assembly-plugin</artifactId>--> | ||
<!--<configuration>--> | ||
<!--<archive>--> | ||
<!--<manifest>--> | ||
<!--<mainClass>io.dcos.FinancialTransactionJob</mainClass>--> | ||
<!--</manifest>--> | ||
<!--</archive>--> | ||
<!--<descriptorRefs>--> | ||
<!--<descriptorRef>jar-with-dependencies</descriptorRef>--> | ||
<!--</descriptorRefs>--> | ||
<!--</configuration>--> | ||
<!--</plugin>--> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-shade-plugin</artifactId> | ||
<executions> | ||
<execution> | ||
<phase>package</phase> | ||
<goals> | ||
<goal>shade</goal> | ||
</goals> | ||
<configuration> | ||
<transformers> | ||
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> | ||
<manifestEntries> | ||
<Main-Class>io.dcos.FinancialTransactionJob</Main-Class> | ||
</manifestEntries> | ||
</transformer> | ||
</transformers> | ||
</configuration> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-java</artifactId> | ||
<version>1.2.0</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-streaming-java_2.10</artifactId> | ||
<version>1.2.0</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-connector-kafka-0.9_2.10</artifactId> | ||
<version>1.2.0</version> | ||
</dependency> | ||
</dependencies> | ||
</project> |
Oops, something went wrong.