Skip to content

Commit

Permalink
bug(query): binary join with ignoring and on bug fix (filodb#315)
Browse files Browse the repository at this point in the history
Extract "ignoring" and "on" labels from expression when specified and use it to create BinaryJoin logical plan
  • Loading branch information
TanviBhavsar authored Apr 10, 2019
1 parent 72c9951 commit 18f3bda
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ trait Expressions extends Aggregates with Functions {
val seriesPlanLhs = series.toPeriodicSeriesPlan(timeParams)
val seriesPlanRhs = rhs.asInstanceOf[PeriodicSeries].toPeriodicSeriesPlan(timeParams)
val cardinality = vectorMatch.map(_.cardinality.cardinality).getOrElse(Cardinality.OneToOne)
BinaryJoin(seriesPlanLhs, operator.getPlanOperator, cardinality, seriesPlanRhs)

val matcher = vectorMatch.flatMap(_.matching)
val onLabels = matcher.filter(_.isInstanceOf[On]).map(_.labels)
val ignoringLabels = matcher.filter(_.isInstanceOf[Ignoring]).map(_.labels)

BinaryJoin(seriesPlanLhs, operator.getPlanOperator, cardinality, seriesPlanRhs,
onLabels.getOrElse(Nil), ignoringLabels.getOrElse(Nil))

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import org.scalatest.{FunSpec, Matchers}

import filodb.prometheus.ast.TimeStepParams


//noinspection ScalaStyle
// scalastyle:off
class ParserSpec extends FunSpec with Matchers {
Expand Down Expand Up @@ -276,8 +275,12 @@ class ParserSpec extends FunSpec with Matchers {
"ApplyInstantFunction(Aggregate(Sum,PeriodicSeriesWithWindowing(RawSeries(IntervalSelector(1524855388000,1524855988000),List(ColumnFilter(__name__,Equals(http_request_duration_seconds_bucket))),List()),1524855988000,1000000,1524855988000,600000,Rate,List()),List(),List(le),List()),HistogramQuantile,List(0.9))",
"delta(cpu_temp_celsius{host=\"zeus\"}[2h])" ->
"PeriodicSeriesWithWindowing(RawSeries(IntervalSelector(1524848788000,1524855988000),List(ColumnFilter(host,Equals(zeus)), ColumnFilter(__name__,Equals(cpu_temp_celsius))),List()),1524855988000,1000000,1524855988000,7200000,Delta,List())",
"method_code:http_errors:rate5m{code=\"500\"} / ignoring(code) method:http_requests:rate5m" ->
"method_code:http_errors:rate5m{code=\"500\"} / method:http_requests:rate5m" ->
"BinaryJoin(PeriodicSeries(RawSeries(IntervalSelector(1524855688000,1524855988000),List(ColumnFilter(code,Equals(500)), ColumnFilter(__name__,Equals(method_code:http_errors:rate5m))),List()),1524855988000,1000000,1524855988000),DIV,OneToOne,PeriodicSeries(RawSeries(IntervalSelector(1524855688000,1524855988000),List(ColumnFilter(__name__,Equals(method:http_requests:rate5m))),List()),1524855988000,1000000,1524855988000),List(),List())",
"method_code:http_errors:rate5m{code=\"500\"} / ignoring(code) method:http_requests:rate5m" ->
"BinaryJoin(PeriodicSeries(RawSeries(IntervalSelector(1524855688000,1524855988000),List(ColumnFilter(code,Equals(500)), ColumnFilter(__name__,Equals(method_code:http_errors:rate5m))),List()),1524855988000,1000000,1524855988000),DIV,OneToOne,PeriodicSeries(RawSeries(IntervalSelector(1524855688000,1524855988000),List(ColumnFilter(__name__,Equals(method:http_requests:rate5m))),List()),1524855988000,1000000,1524855988000),List(),List(code))",
"method_code:http_errors:rate5m{code=\"500\"} / on(method) method:http_requests:rate5m" ->
"BinaryJoin(PeriodicSeries(RawSeries(IntervalSelector(1524855688000,1524855988000),List(ColumnFilter(code,Equals(500)), ColumnFilter(__name__,Equals(method_code:http_errors:rate5m))),List()),1524855988000,1000000,1524855988000),DIV,OneToOne,PeriodicSeries(RawSeries(IntervalSelector(1524855688000,1524855988000),List(ColumnFilter(__name__,Equals(method:http_requests:rate5m))),List()),1524855988000,1000000,1524855988000),List(method),List())",
"histogram_quantile(0.9, rate(http_request_duration_seconds_bucket[10m]))" ->
"ApplyInstantFunction(PeriodicSeriesWithWindowing(RawSeries(IntervalSelector(1524855388000,1524855988000),List(ColumnFilter(__name__,Equals(http_request_duration_seconds_bucket))),List()),1524855988000,1000000,1524855988000,600000,Rate,List()),HistogramQuantile,List(0.9))",
"histogram_quantile(0.9, sum(rate(http_request_duration_seconds_bucket[10m])) by (job, le))" ->
Expand Down Expand Up @@ -312,8 +315,10 @@ class ParserSpec extends FunSpec with Matchers {
"PeriodicSeries(RawSeries(IntervalSelector(1524855688000,1524855988000),List(ColumnFilter(__name__,Equals(http_requests_total))),List()),1524855988000,1000000,1524855988000)",
"http_requests_total{environment=~\"staging|testing|development\",method!=\"GET\"}" ->
"PeriodicSeries(RawSeries(IntervalSelector(1524855688000,1524855988000),List(ColumnFilter(environment,EqualsRegex(staging|testing|development)), ColumnFilter(method,NotEquals(GET)), ColumnFilter(__name__,Equals(http_requests_total))),List()),1524855988000,1000000,1524855988000)",

"method_code:http_errors:rate5m / ignoring(code) group_left method:http_requests:rate5m" ->
"BinaryJoin(PeriodicSeries(RawSeries(IntervalSelector(1524855688000,1524855988000),List(ColumnFilter(__name__,Equals(method_code:http_errors:rate5m))),List()),1524855988000,1000000,1524855988000),DIV,OneToMany,PeriodicSeries(RawSeries(IntervalSelector(1524855688000,1524855988000),List(ColumnFilter(__name__,Equals(method:http_requests:rate5m))),List()),1524855988000,1000000,1524855988000),List(),List())",
"BinaryJoin(PeriodicSeries(RawSeries(IntervalSelector(1524855688000,1524855988000),List(ColumnFilter(__name__,Equals(method_code:http_errors:rate5m))),List()),1524855988000,1000000,1524855988000),DIV,OneToMany,PeriodicSeries(RawSeries(IntervalSelector(1524855688000,1524855988000),List(ColumnFilter(__name__,Equals(method:http_requests:rate5m))),List()),1524855988000,1000000,1524855988000),List(),List(code))",

"increase(http_requests_total{job=\"api-server\"}[5m])" ->
"PeriodicSeriesWithWindowing(RawSeries(IntervalSelector(1524855688000,1524855988000),List(ColumnFilter(job,Equals(api-server)), ColumnFilter(__name__,Equals(http_requests_total))),List()),1524855988000,1000000,1524855988000,300000,Increase,List())",
"sum(http_requests_total{method=\"GET\"} offset 5m)" ->
Expand All @@ -339,7 +344,7 @@ class ParserSpec extends FunSpec with Matchers {
lp.toString shouldEqual (e)
}
}

private def parseSuccessfully(query: String) = {
Parser.parseQuery(query)
}
Expand Down
83 changes: 83 additions & 0 deletions query/src/test/scala/filodb/query/exec/BinaryJoinExecSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,27 @@ class BinaryJoinExecSpec extends FunSpec with Matchers with ScalaFutures {
}
}

val samplesLhsGrouping: Array[RangeVector] = Array.tabulate(2) { i =>
new RangeVector {
val key: RangeVectorKey = CustomRangeVectorKey(
Map("__name__".utf8 -> s"someMetricLhs".utf8,
"tag1".utf8 -> s"tag1-$i".utf8,
"tag2".utf8 -> s"tag2-1".utf8,
"job".utf8 -> s"somejob".utf8))
val rows: Iterator[RowReader] = data(i).iterator
}
}

val samplesRhsGrouping: Array[RangeVector] = Array.tabulate(2) { i =>
new RangeVector {
val key: RangeVectorKey = CustomRangeVectorKey(
Map("__name__".utf8 -> s"someMetricRhs".utf8,
"tag1".utf8 -> s"tag1-$i".utf8,
"job".utf8 -> s"somejob".utf8))
val rows: Iterator[RowReader] = data(i).iterator
}
}

it("should join one-to-one without on or ignoring") {

val samplesRhs2 = scala.util.Random.shuffle(samplesRhs.toList) // they may come out of order
Expand Down Expand Up @@ -185,4 +206,66 @@ class BinaryJoinExecSpec extends FunSpec with Matchers with ScalaFutures {
e shouldBe a[BadQueryException]
}
}
it("should join one-to-one with ignoring") {

val execPlan = BinaryJoinExec("someID", dummyDispatcher,
Array(dummyPlan), // cannot be empty as some compose's rely on the schema
new Array[ExecPlan](1), // empty since we test compose, not execute or doExecute
BinaryOperator.ADD,
Cardinality.OneToOne,
Nil, Seq("tag2"))

val schema = Seq(ColumnInfo("timestamp", ColumnType.LongColumn),
ColumnInfo("value", ColumnType.DoubleColumn))

// scalastyle:off
val lhs = QueryResult("someId", null, samplesLhsGrouping.map(rv => SerializableRangeVector(rv, schema)))
// val lhs = QueryResult("someId", null, samplesLhs.filter(rv => rv.key.labelValues.get(ZeroCopyUTF8String("tag2")).get.equals("tag1-1")).map(rv => SerializableRangeVector(rv, schema)))
val rhs = QueryResult("someId", null, samplesRhsGrouping.map(rv => SerializableRangeVector(rv, schema)))
// scalastyle:on
// note below that order of lhs and rhs is reversed, but index is right. Join should take that into account
val result = execPlan.compose(dataset, Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), queryConfig)
.toListL.runAsync.futureValue

result.foreach { rv =>
rv.key.labelValues.contains("__name__".utf8) shouldEqual false
rv.key.labelValues.contains("tag1".utf8) shouldEqual true
rv.key.labelValues.contains("tag2".utf8) shouldEqual false
val i = rv.key.labelValues("tag1".utf8).asNewString.split("-")(1)
rv.rows.map(_.getDouble(1)).foreach(_ shouldEqual i.toDouble * 2)
}

result.map(_.key).toSet.size shouldEqual 2
}

it("should join one-to-one with on") {

val execPlan = BinaryJoinExec("someID", dummyDispatcher,
Array(dummyPlan), // cannot be empty as some compose's rely on the schema
new Array[ExecPlan](1), // empty since we test compose, not execute or doExecute
BinaryOperator.ADD,
Cardinality.OneToOne,
Seq("tag1", "job"), Nil)

val schema = Seq(ColumnInfo("timestamp", ColumnType.LongColumn),
ColumnInfo("value", ColumnType.DoubleColumn))

// scalastyle:off
val lhs = QueryResult("someId", null, samplesLhsGrouping.map(rv => SerializableRangeVector(rv, schema)))
val rhs = QueryResult("someId", null, samplesRhsGrouping.map(rv => SerializableRangeVector(rv, schema)))
// scalastyle:on
// note below that order of lhs and rhs is reversed, but index is right. Join should take that into account
val result = execPlan.compose(dataset, Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), queryConfig)
.toListL.runAsync.futureValue

result.foreach { rv =>
rv.key.labelValues.contains("__name__".utf8) shouldEqual false
rv.key.labelValues.contains("tag1".utf8) shouldEqual true
rv.key.labelValues.contains("tag2".utf8) shouldEqual false
val i = rv.key.labelValues("tag1".utf8).asNewString.split("-")(1)
rv.rows.map(_.getDouble(1)).foreach(_ shouldEqual i.toDouble * 2)
}

result.map(_.key).toSet.size shouldEqual 2
}
}

0 comments on commit 18f3bda

Please sign in to comment.