Skip to content

Commit

Permalink
initial commit streaming app
Browse files Browse the repository at this point in the history
  • Loading branch information
Youssef Bentaleb committed Jan 6, 2022
1 parent a89d369 commit 31b433e
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 12 deletions.
9 changes: 5 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<groupId>com.yousseefben</groupId>
<artifactId>KafkaStream</artifactId>
<version>1.0-SNAPSHOT</version>

Expand All @@ -30,9 +30,10 @@
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
<version>2.15.0</version>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.7.0</version>
<scope>test</scope>
</dependency>


Expand Down
12 changes: 6 additions & 6 deletions src/main/java/MyKafkaStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public KafkaStreams streamLocationFraude() {
.filter((key, value) -> value != null)
.peek((k, v) -> logger.info("event fraude key : {} value : {}", k, v))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(FraudDto::new, (k, v, fraud) -> {
fraud.setUsername(v.getDetails().getUsername());
fraud.setIpAddress(v.getIpAddress());
Expand Down Expand Up @@ -108,18 +108,18 @@ KafkaStreams deviceAttemptStream() {
StreamsBuilder builder = new StreamsBuilder();
final KTable<String, DeviceDto> knownDevices = builder.table(KNOWN_DEVICE_TOPIC, Consumed.with(stringSerde, deviceJsonSerde));

KStream<String, KeycloakDto> deviceStrem = builder
KStream<String, KeycloakDto> deviceStream = builder
.stream(LOGIN_ATTEMPT_TOPIC, Consumed.with(stringSerde, keycloakJsonSerde))
.filter((key, value) -> value != null)
.map((key, v) -> new KeyValue<>(v.getUsername() + ":" + getHashDevice(v.getDevice()), v))
.leftJoin(knownDevices, (left, right) -> {
if (right == null) return left;
return null;
})
.filter((key, value) -> value != null)
.peek((k, v) -> logger.info("new device attempt key: {} value : {}", k, v));
deviceStrem.to(NEW_DEVICE_ATTEMPT_TOPIC, Produced.with(stringSerde, keycloakJsonSerde));
deviceStrem
.filter((key, value) -> value != null);
deviceStream.to(NEW_DEVICE_ATTEMPT_TOPIC, Produced.with(stringSerde, keycloakJsonSerde));

deviceStream
.map((k, v) -> Utils.deviceFraudMapper(v))
.to(MALICIOUS_ATTEMPT_TOPIC, Produced.with(stringSerde, fraudSerde));

Expand Down
2 changes: 0 additions & 2 deletions src/main/java/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,4 @@ static String getHashDevice(DeviceDto deviceDto) {
String deviceHash = deviceDto.getOs() + deviceDto.getOsVersion() + deviceDto.getDevice() + deviceDto.isMobile();
return Base64.getEncoder().encodeToString(deviceHash.getBytes(StandardCharsets.UTF_8));
}


}

0 comments on commit 31b433e

Please sign in to comment.