Skip to content

Commit

Permalink
start implement streams primitives
Browse files Browse the repository at this point in the history
  • Loading branch information
mgoblin committed Jan 4, 2018
1 parent fe2b9a6 commit 209bf3f
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
4 changes: 2 additions & 2 deletions src/main/scala/ru/mg/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package ru.mg
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import ru.mg.csv.PaymentsStream
import ru.mg.detectors.Detectors._
import ru.mg.streams.CsvFilePaymentsStream

object Main extends LazyLogging {
def main(args: Array[String]): Unit = {
Expand All @@ -13,7 +13,7 @@ object Main extends LazyLogging {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val input = PaymentsStream(env, "data/payments.csv")
val input = CsvFilePaymentsStream(env, "data/payments.csv")

frequentOutgoings(input)
.addSink(s => logger.info(s"$s"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package ru.mg.csv
package ru.mg.streams

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import ru.mg.csv.CsvStreamBuilder
import ru.mg.domain.payment.Payment

import ru.mg.domain.payment.Payments._

object PaymentsStream {
object CsvFilePaymentsStream {
def apply(env: StreamExecutionEnvironment, filePath: String): DataStream[Payment] ={
val csvStreamBuilder = new CsvStreamBuilder(filePath)

Expand Down
8 changes: 4 additions & 4 deletions src/test/scala/ru/mg/detectors/FrequentOutgoingSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import org.apache.flink.streaming.api.windowing.time.Time
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Matchers}
import ru.mg.csv.PaymentsStream
import ru.mg.detectors.Detectors._
import ru.mg.domain.fraud.Fraud
import ru.mg.domain.payment.{Payment, Person}
import ru.mg.streams.CsvFilePaymentsStream
import ru.mg.utils.{FileUtils, SinkCollector}

@RunWith(classOf[JUnitRunner])
Expand All @@ -23,7 +23,7 @@ class FrequentOutgoingSpec extends FlatSpec with Serializable with Matchers {
val env = StreamExecutionEnvironment.createLocalEnvironment(parallelism = 1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val input = PaymentsStream(env, tempFile.getAbsolutePath)
val input = CsvFilePaymentsStream(env, tempFile.getAbsolutePath)

SinkCollector.clear()
val collector = SinkCollector[Fraud]
Expand Down Expand Up @@ -55,7 +55,7 @@ class FrequentOutgoingSpec extends FlatSpec with Serializable with Matchers {
val env = StreamExecutionEnvironment.createLocalEnvironment(parallelism = 1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val input = PaymentsStream(env, tempFile.getAbsolutePath)
val input = CsvFilePaymentsStream(env, tempFile.getAbsolutePath)

SinkCollector.clear()
val collector = SinkCollector[Fraud]
Expand All @@ -76,7 +76,7 @@ class FrequentOutgoingSpec extends FlatSpec with Serializable with Matchers {
val env = StreamExecutionEnvironment.createLocalEnvironment(parallelism = 1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val input = PaymentsStream(env, tempFile.getAbsolutePath)
val input = CsvFilePaymentsStream(env, tempFile.getAbsolutePath)

SinkCollector.clear()
val collector = SinkCollector[Fraud]
Expand Down

0 comments on commit 209bf3f

Please sign in to comment.