Skip to content

Commit

Permalink
[SPARK-38532][SS][TESTS] Add test case for invalid gapDuration of ses…
Browse files Browse the repository at this point in the history
…sionwindow

### What changes were proposed in this pull request?

Since the dynamic gapduration has been added in the session window,[apache#33691](apache#33691) users are allowed to enter invalid gapduration ,then filter invalide events . However, for now, test cases are only added for zero and negative gapduration. I think it is necessary to add test cases for invalid gapduration.

### Why are the changes needed?

Described in above section.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass the GA.

Closes apache#35824 from nyingping/AddtestcaseForInvalidGapDuration.

Lead-authored-by: nyingping <[email protected]>
Co-authored-by: Nie yingping <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
nyingping authored and HeartSaVioR committed Mar 14, 2022
1 parent 83673c8 commit 715a06c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4062,7 +4062,7 @@ object SessionWindowing extends Rule[LogicalPlan] {
}

// As same as tumbling window, we add a filter to filter out nulls.
// And we also filter out events with negative or zero gap duration.
// And we also filter out events with negative or zero or invalid gap duration.
val filterExpr = IsNotNull(session.timeColumn) &&
(sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,34 @@ class DataFrameSessionWindowingSuite extends QueryTest with SharedSparkSession
}
}

test("SPARK-36465: filter out events with invalid gap duration.") {
val df = Seq(
("2016-03-27 19:39:30", 1, "a")).toDF("time", "value", "id")

checkAnswer(
df.groupBy(session_window($"time", "x sec"))
.agg(count("*").as("counts"))
.orderBy($"session_window.start".asc)
.select($"session_window.start".cast("string"), $"session_window.end".cast("string"),
$"counts"),
Seq()
)

withTempTable { table =>
checkAnswer(
spark.sql("select session_window(time, " +
"""case when value = 1 then "2 seconds" when value = 2 then "invalid gap duration" """ +
s"""else "20 seconds" end), value from $table""")
.select($"session_window.start".cast(StringType), $"session_window.end".cast(StringType),
$"value"),
Seq(
Row("2016-03-27 19:39:27", "2016-03-27 19:39:47", 4),
Row("2016-03-27 19:39:34", "2016-03-27 19:39:36", 1)
)
)
}
}

test("SPARK-36724: Support timestamp_ntz as a type of time column for SessionWindow") {
val df = Seq((LocalDateTime.parse("2016-03-27T19:39:30"), 1, "a"),
(LocalDateTime.parse("2016-03-27T19:39:25"), 2, "a")).toDF("time", "value", "id")
Expand Down

0 comments on commit 715a06c

Please sign in to comment.