-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathClean.scala
122 lines (86 loc) · 3.75 KB
/
Clean.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package zio.spark.wrap
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import zio._
import zio.spark._
import zio.spark.wrap.Clean.Aux
import scala.util.Try
trait Clean[A] {
type Out
@inline
def apply(a: A): Out
}
sealed trait LowPriorityClean1 {
implicit final def _any[T]: Clean.Aux[T, Impure[T]] =
new Clean[T] {
override type Out = Impure[T]
override def apply(a: T): Out = new Impure[T](a) {}
}
}
sealed trait LowPriorityClean0 extends LowPriorityClean1 {
implicit final def _dataset[T]: Aux[Dataset[T], ZDataset[T]] = Clean.impure(ds => new ZDataset(ds))
}
object Clean extends LowPriorityClean0 {
type Aux[A, B] =
Clean[A] {
type Out = B
}
type Pure[T] = Aux[T, T]
final protected[spark] def pure[T]: Pure[T] =
new Clean[T] {
override type Out = T
@inline
override def apply(a: T): Out = a
}
final protected[spark] def impure[A, B <: Impure[_]](f: A => B): Aux[A, B] =
new Clean[A] {
override type Out = B
override def apply(a: A): Out = f(a)
}
implicit val _string: Pure[String] = pure
implicit val _int: Pure[Int] = pure
implicit val _long: Pure[Long] = pure
implicit val _unit: Pure[Unit] = pure
implicit val _boolean: Pure[Boolean] = pure
implicit val _row: Pure[Row] = pure
implicit val _column: Pure[Column] = pure
implicit def _wrapped[T <: Impure[_]]: Pure[T] = pure
implicit def _rdd[T]: Aux[RDD[T], ZRDD[T]] = impure(rdd => new ZRDD(rdd))
implicit val _dataframe: Aux[DataFrame, ZDataFrame] = impure(df => new ZDataFrame(df))
implicit val _sparkSession: Aux[SparkSession, ZSparkSession] = impure(ss => new ZSparkSession(ss))
implicit val _sparkContext: Aux[SparkContext, ZSparkContext] = impure(sc => new ZSparkContext(sc))
implicit def _seq[A, B](implicit W: Aux[A, B]): Aux[Seq[A], Seq[B]] =
new Clean[Seq[A]] {
override type Out = Seq[B]
override def apply(a: Seq[A]): Out = a.map(W.apply)
}
implicit def _option[A, B](implicit W: Aux[A, B]): Aux[Option[A], Option[B]] =
new Clean[Option[A]] {
override type Out = Option[B]
override def apply(a: Option[A]): Out = a.map(W.apply)
}
implicit val _relationalgroupeddataset: Aux[RelationalGroupedDataset, ZRelationalGroupedDataset] =
impure(rgd => ZRelationalGroupedDataset(rgd))
def apply[A](implicit C: Clean[A]): C.type = C
// def apply[A](a: A)(implicit C: Clean[A]): C.Out = C(a)
def effect[A](a: => A)(implicit C: Clean[A]): Task[C.Out] = Task(C(a))
}
abstract class Impure[+V](private val value: V) {
/** @usecase def execute[B](f: V => B):Task[B] */
final def execute[B, Pure](f: V => B)(implicit C: Clean.Aux[B, Pure]): Task[Pure] = Task(C(f(value)))
final def executeM[R, B, Pure](f: V => RIO[R, B])(implicit C: Clean.Aux[B, Pure]): RIO[R, Pure] =
Task(f(value).map(C.apply)).flatten
final protected def executeSuccess[B, C](f: V => B)(implicit C: Clean.Aux[B, C]): UIO[C] = UIO(C(f(value)))
final protected def executeSuccessM[R, E, B, Pure](
f: V => ZIO[R, E, B]
)(implicit W: Clean.Aux[B, Pure]): ZIO[R, E, Pure] = f(value).map(W.apply)
final protected def executeSuccessNow[B, Pure](f: V => B)(implicit C: Clean.Aux[B, Pure]): Pure = C(f(value))
final protected def executeNow[B, Pure](f: V => B)(implicit C: Clean.Aux[B, Pure]): Try[Pure] = Try(C(f(value)))
}
abstract class ImpureF[R, V](final val rio: RIO[R, Impure[V]]) {
protected def copy(f: V => V): ImpureF[R, V]
@inline final def execute[B, Pure](f: V => B)(implicit C: Clean.Aux[B, Pure]): RIO[R, Pure] = rio >>= (_.execute(f))
@inline final def executeM[B, Pure](f: V => RIO[R, B])(implicit C: Clean.Aux[B, Pure]): RIO[R, Pure] =
rio >>= (_.executeM(f))
}