Skip to content

Commit

Permalink
Merge pull request #1 from antonioalf/anistal-feature/improvements-be…
Browse files Browse the repository at this point in the history
…nchmark

Anistal feature/improvements benchmark
  • Loading branch information
anistal committed Mar 3, 2016
2 parents ad6b077 + 9d3b5e1 commit 458ac3c
Show file tree
Hide file tree
Showing 288 changed files with 2,674 additions and 4,668 deletions.
20 changes: 1 addition & 19 deletions aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,7 @@
</dependency>
<dependency>
<groupId>com.stratio.sparkta</groupId>
<artifactId>field-default</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.stratio.sparkta</groupId>
<artifactId>operator-count</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.stratio.sparkta</groupId>
<artifactId>operator-sum</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.stratio.sparkta</groupId>
<artifactId>field-dateTime</artifactId>
<artifactId>plugins</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,14 @@ case class Cube(name: String,
* 4. Cube with no operators.
*/

def aggregate(dimensionsValues: DStream[(DimensionValuesTime, Row)])
def aggregate(dimensionsValues: DStream[(DimensionValuesTime, InputFields)])
: DStream[(DimensionValuesTime, MeasuresValues)] = {

val filteredValues = filterDimensionValues(dimensionsValues)
val associativesCalculated = if (associativeOperators.nonEmpty)
Option(updateAssociativeState(associativeAggregation(filteredValues)))
Option(updateAssociativeState(associativeAggregation(dimensionsValues)))
else None
val nonAssociativesCalculated = if (nonAssociativeOperators.nonEmpty)

Option(aggregateNonAssociativeValues(updateNonAssociativeState(filteredValues)))
Option(aggregateNonAssociativeValues(updateNonAssociativeState(dimensionsValues)))
else None

(associativesCalculated, nonAssociativesCalculated) match {
Expand All @@ -86,20 +84,6 @@ case class Cube(name: String,
}
}

/**
* Filter dimension values that correspond with the current cube dimensions
*/

protected def filterDimensionValues(dimensionValues: DStream[(DimensionValuesTime, Row)])
: DStream[(DimensionValuesTime, InputFields)] = {
dimensionValues.map { case (dimensionsValuesTime, aggregationValues) =>
val dimensionsFiltered = dimensionsValuesTime.dimensionValues.filter(dimVal =>
dimensions.exists(comp => comp.name == dimVal.dimension.name))

(dimensionsValuesTime.copy(dimensionValues = dimensionsFiltered), InputFields(aggregationValues, UpdatedValues))
}
}

private def updateFuncNonAssociativeWithTime =
(iterator: Iterator[(DimensionValuesTime, Seq[InputFields], Option[AggregationsValues])]) => {

Expand Down Expand Up @@ -262,7 +246,7 @@ case class Cube(name: String,

//scalastyle:on

protected def noAggregationsState(dimensionsValues: DStream[(DimensionValuesTime, Row)])
protected def noAggregationsState(dimensionsValues: DStream[(DimensionValuesTime, InputFields)])
: DStream[(DimensionValuesTime, MeasuresValues)] =
dimensionsValues.mapValues(aggregations =>
MeasuresValues(operators.map(op => op.key -> None).toMap))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,16 @@ case class CubeMaker(cubes: Seq[Cube]) {

protected case class CubeOperations(cube: Cube) extends SLF4JLogging {

private final val UpdatedValues = 1

/**
* Extract a modified stream that will be needed to calculate aggregations.
*
* @param inputStream with the original stream of data.
* @return a modified stream after join dimensions, cubes and operations.
*/
def extractDimensionsAggregations(inputStream: DStream[Row]): DStream[(DimensionValuesTime, Row)] = {
inputStream.flatMap(row => Try {
def extractDimensionsAggregations(inputStream: DStream[Row]): DStream[(DimensionValuesTime, InputFields)] = {
inputStream.mapPartitions(rows => rows.flatMap(row => Try {
val dimensionValues = for {
dimension <- cube.dimensions
value = row.get(cube.initSchema.fieldIndex(dimension.field))
Expand All @@ -73,19 +75,21 @@ protected case class CubeOperations(cube: Cube) extends SLF4JLogging {

cube.expiringDataConfig match {
case None =>
(DimensionValuesTime(cube.name, dimensionValues), row)
(DimensionValuesTime(cube.name, dimensionValues), InputFields(row, UpdatedValues))
case Some(expiringDataConfig) =>
val eventTime = extractEventTime(dimensionValues)
val timeDimension = expiringDataConfig.timeDimension
(DimensionValuesTime(cube.name, dimensionValues, Option(TimeConfig(eventTime, timeDimension))), row)
(DimensionValuesTime(cube.name, dimensionValues, Option(TimeConfig(eventTime, timeDimension))),
InputFields(row, UpdatedValues))
}
} match {
case Success(dimensionValuesTime) => Some(dimensionValuesTime)
case Success(dimensionValuesTime) =>
Some(dimensionValuesTime)
case Failure(exception) =>
val error = s"Failure[Aggregations]: ${row.toString} | ${exception.getLocalizedMessage}"
log.error(error, exception)
None
})
}), true)
}

private def extractEventTime(dimensionValues: Seq[DimensionValue]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ case class CubeWriter(cube: Cube,
}
case None => log.warn(s"The output in the cube : $outputName not match in the outputs")
})
} else log.info("Empty event received")
} else log.debug("Empty event received")
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import java.io.{Serializable => JSerializable}
import java.sql.Timestamp

import com.github.nscala_time.time.Imports._
import com.stratio.sparkta.plugin.default.DefaultField
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{TimestampType, StringType, LongType, StructField, StructType}
import org.apache.spark.streaming.TestSuiteBase
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner

import com.stratio.sparkta.plugin.field.datetime.DateTimeField
import com.stratio.sparkta.plugin.field.default.DefaultField
import com.stratio.sparkta.plugin.operator.count.CountOperator
import com.stratio.sparkta.sdk._

Expand All @@ -43,18 +43,7 @@ class CubeMakerTest extends TestSuiteBase {
- O = No operator for the cube
- R = Cube with D+B+O
This test should produce Seq[(Seq[DimensionValue], Map[String, JSerializable])] with values:
List(
((DimensionValuesTime(Seq(DimensionValue(
Dimension("dim1", "eventKey", "identity", defaultDimension), "value1")), timestamp), Map("eventKey" -> "value1")
),
(DimensionValuesTime(Seq(DimensionValue(
Dimension("dim1", "eventKey", "identity", defaultDimension), "value2")), timestamp), Map("eventKey" -> "value2")
),
(DimensionValuesTime(Seq(DimensionValue(
Dimension("dim1", "eventKey", "identity", defaultDimension), "value3")), timestamp), Map("eventKey" -> "value3")
))
This test should produce Seq[(Seq[DimensionValue], Map[String, JSerializable])]
*/
test("DataCube extracts dimensions from events") {

Expand Down Expand Up @@ -101,17 +90,17 @@ class CubeMakerTest extends TestSuiteBase {
* @return the expected result to test
*/
def getEventOutput(timestamp: Timestamp, millis: Long):
Seq[Seq[(DimensionValuesTime, Row)]] = {
Seq[Seq[(DimensionValuesTime, InputFields)]] = {
val dimensionString = Dimension("dim1", "eventKey", "identity", new DefaultField)
val dimensionTime = Dimension("minute", "minute", "minute", new DateTimeField)
val dimensionValueString1 = DimensionValue(dimensionString, "value1")
val dimensionValueString2 = dimensionValueString1.copy(value = "value2")
val dimensionValueString3 = dimensionValueString1.copy(value = "value3")
val dimensionValueTs = DimensionValue(dimensionTime, timestamp)
val tsMap = Row(timestamp)
val valuesMap1 = Row("value1", timestamp)
val valuesMap2 = Row("value2", timestamp)
val valuesMap3 = Row("value3", timestamp)
val valuesMap1 = InputFields(Row("value1", timestamp), 1)
val valuesMap2 = InputFields(Row("value2", timestamp), 1)
val valuesMap3 = InputFields(Row("value3", timestamp), 1)

Seq(Seq(
(DimensionValuesTime("cubeName", Seq(dimensionValueString1, dimensionValueTs)), valuesMap1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.joda.time.DateTime
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner

import com.stratio.sparkta.plugin.field.default.DefaultField
import com.stratio.sparkta.plugin.default.DefaultField
import com.stratio.sparkta.plugin.operator.count.CountOperator
import com.stratio.sparkta.plugin.operator.sum.SumOperator
import com.stratio.sparkta.sdk._
Expand Down Expand Up @@ -64,26 +64,31 @@ class CubeTest extends TestSuiteBase {

testOperation(getInput, cube.aggregate, getOutput, PreserverOrder)

def getInput: Seq[Seq[(DimensionValuesTime, Row)]] = Seq(Seq(
def getInput: Seq[Seq[(DimensionValuesTime, InputFields)]] = Seq(Seq(
(DimensionValuesTime("testCube",
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar")),
timeConfig), Row(4)),
timeConfig), InputFields(Row(4), 1)),

(DimensionValuesTime("testCube",
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar")), timeConfig), Row(3)),
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar")), timeConfig),
InputFields(Row(3), 1)),

(DimensionValuesTime("testCube",
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "foo")), timeConfig), Row(3))),
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "foo")), timeConfig),
InputFields(Row(3), 1))),

Seq(
(DimensionValuesTime("testCube",
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar")), timeConfig), Row(4)),
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar")), timeConfig),
InputFields(Row(4), 1)),

(DimensionValuesTime("testCube",
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar")), timeConfig), Row(3)),
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar")), timeConfig),
InputFields(Row(3), 1)),

(DimensionValuesTime("testCube",
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "foo")), timeConfig), Row(3))))
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "foo")), timeConfig),
InputFields(Row(3), 1))))

def getOutput: Seq[Seq[(DimensionValuesTime, MeasuresValues)]] = Seq(
Seq(
Expand Down Expand Up @@ -131,25 +136,25 @@ class CubeTest extends TestSuiteBase {

testOperation(getInput, cube.aggregate, getOutput, PreserverOrder)

def getInput: Seq[Seq[(DimensionValuesTime, Row)]] = Seq(Seq(
def getInput: Seq[Seq[(DimensionValuesTime, InputFields)]] = Seq(Seq(
(DimensionValuesTime("testCube",
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar"))), Row(4)),
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar"))), InputFields(Row(4), 1)),

(DimensionValuesTime("testCube",
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar"))), Row(3)),
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar"))), InputFields(Row(3), 1)),

(DimensionValuesTime("testCube",
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "foo"))), Row(3))),
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "foo"))), InputFields(Row(3), 1))),

Seq(
(DimensionValuesTime("testCube",
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar"))), Row(4)),
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar"))), InputFields(Row(4), 1)),

(DimensionValuesTime("testCube",
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar"))), Row(3)),
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "bar"))), InputFields(Row(3), 1)),

(DimensionValuesTime("testCube",
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "foo"))), Row(3))))
Seq(DimensionValue(Dimension("dim1", "foo", "identity", defaultDimension), "foo"))), InputFields(Row(3), 1))))

def getOutput: Seq[Seq[(DimensionValuesTime, MeasuresValues)]] = Seq(
Seq(
Expand Down
4 changes: 2 additions & 2 deletions benchmark/src/main/resources/benchmark.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "benchmark",
"description": "No description",
"sparkStreamingWindow": 2000,
"checkpointPath": "checkpoint",
"checkpointPath": "/tmp/checkpoint",
"rawData": {
"enabled": "false",
"partitionFormat": "day",
Expand Down Expand Up @@ -120,4 +120,4 @@
}
}
]
}
}
Loading

0 comments on commit 458ac3c

Please sign in to comment.