Skip to content

Commit

Permalink
ZIO Streams - improved documentation. (zio#3480)
Browse files Browse the repository at this point in the history
* ZIO Streams - improved documentation.

* Updated ZStream documentation based on zio#3079 change.
  • Loading branch information
michalwarecki authored Apr 30, 2020
1 parent de5276b commit 03e4dd2
Showing 1 changed file with 74 additions and 0 deletions.
74 changes: 74 additions & 0 deletions docs/datatypes/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,86 @@ val streamFromIterable: Stream[Nothing, Int] = Stream.fromIterable(0 to 100)

## Transforming a Stream

ZIO Stream supports many standard transforming functions like `map`, `partition`, `grouped`, `groupByKey`, `groupedWithin`
and many others. Here are examples of how to use them.

### map
```scala mdoc:silent
import zio.stream._

val intStream: Stream[Nothing, Int] = Stream.fromIterable(0 to 100)
val stringStream: Stream[Nothing, String] = intStream.map(_.toString)
```
### partition
`partition` function splits the stream into tuple of streams based on predicate. The first stream contains all
element evaluated to true and the second one contains all element evaluated to false.
The faster stream may advance by up to `buffer` elements further than the slower one. Two streams are
wrapped by `ZManaged` type. In the example below, left stream consists of even numbers only.

```scala mdoc:silent
import zio._
import zio.stream._

val partitionResult: ZManaged[Any, Nothing, (ZStream[Any, Nothing, Int], ZStream[Any, Nothing, Int])] =
Stream
.fromIterable(0 to 100)
.partition(_ % 2 == 0, buffer = 50)
```

### grouped
To partition the stream results with the specified chunk size, you can use `grouped` function.

```scala mdoc:silent
import zio._
import zio.stream._

val groupedResult: ZStream[Any, Nothing, List[Int]] =
Stream
.fromIterable(0 to 100)
.grouped(50)
```

### groupByKey
To partition the stream by function result you can use `groupByKey` or `groupBy`. In the example below
exam results are grouped into buckets and counted.

```scala mdoc:silent
import zio._
import zio.stream._

case class Exam(person: String, score: Int)

val examResults = Seq(
Exam("Alex", 64),
Exam("Michael", 97),
Exam("Bill", 77),
Exam("John", 78),
Exam("Bobby", 71)
)

val groupByKeyResult: ZStream[Any, Nothing, (Int, Int)] =
Stream
.fromIterable(examResults)
.groupByKey(exam => exam.score / 10 * 10) {
case (k, s) => ZStream.fromEffect(s.runCollect.map(l => k -> l.size))
}
```

### groupedWithin
`groupedWithin` allows to group events by time or chunk size, whichever is satisfied first. In the example below
every chunk consists of 30 elements and is produced every 3 seconds.

```scala mdoc:silent
import zio._
import zio.stream._
import zio.duration._
import zio.clock.Clock

val groupedWithinResult: ZStream[Any with Clock, Nothing, List[Int]] =
Stream.fromIterable(0 to 10)
.repeat(Schedule.spaced(1 seconds))
.groupedWithin(30, 10 seconds)
```

## Consuming a Stream

Expand Down

0 comments on commit 03e4dd2

Please sign in to comment.