-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathrx.go
81 lines (70 loc) · 2.75 KB
/
rx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package rx
import (
"context"
"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/hooks"
"github.com/rsocket/rsocket-go/internal/common"
"github.com/rsocket/rsocket-go/payload"
)
func init() {
hooks.OnNextDrop(func(v reactor.Any) {
common.TryRelease(v)
})
hooks.OnErrorDrop(func(e error) {
common.TryRelease(e)
})
}
// RequestMax represents unbounded request amount.
const RequestMax = reactor.RequestInfinite
const (
// SignalComplete indicated that subscriber was completed.
SignalComplete = SignalType(reactor.SignalTypeComplete)
// SignalCancel indicates that subscriber was cancelled.
SignalCancel = SignalType(reactor.SignalTypeCancel)
// SignalError indicates that subscriber has some faults.
SignalError = SignalType(reactor.SignalTypeError)
)
type (
// FnOnComplete is alias of function for signal when no more elements are available
FnOnComplete = func()
// FnOnNext is alias of function for signal when next element arrived.
FnOnNext = func(input payload.Payload) error
// FnOnSubscribe is alias of function for signal when subscribe begin.
FnOnSubscribe = func(ctx context.Context, s Subscription)
// FnOnError is alias of function for signal when an error occurred.
FnOnError = func(e error)
// FnOnCancel is alias of function for signal when subscription canceled.
FnOnCancel = func()
// FnFinally is alias of function for signal when all things done.
FnFinally = func(s SignalType)
// FnPredicate is alias of function for filter operations.
FnPredicate = func(input payload.Payload) bool
// FnOnRequest is alias of function for signal when requesting next element.
FnOnRequest = func(n int)
// FnTransform is alias of function to transform a payload to another.
FnTransform = func(payload.Payload) (payload.Payload, error)
)
// RawPublisher represents a basic Publisher which can be subscribed by a Subscriber.
type RawPublisher interface {
// SubscribeWith can be used to subscribe current publisher.
SubscribeWith(ctx context.Context, s Subscriber)
}
// Publisher is a provider of a potentially unbounded number of sequenced elements, \
// publishing them according to the demand received from its Subscriber(s).
type Publisher interface {
RawPublisher
// Subscribe subscribe elements from a publisher, returns a Disposable.
// You can add some custom options.
// Using `OnSubscribe`, `OnNext`, `OnComplete` and `OnError` as handler wrapper.
Subscribe(ctx context.Context, options ...SubscriberOption)
}
// SignalType is the signal of reactive events like `OnNext`, `OnComplete`, `OnCancel` and `OnError`.
type SignalType reactor.SignalType
func (s SignalType) String() string {
return reactor.SignalType(s).String()
}
// Item is a kind of container which contains value or error.
type Item struct {
V payload.Payload
E error
}