forked from ReactiveX/RxPY
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnectableobservable.py
82 lines (62 loc) · 2.69 KB
/
connectableobservable.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from typing import List, Optional, TypeVar
from reactivex import abc
from reactivex.disposable import CompositeDisposable, Disposable
from .observable import Observable
_T = TypeVar("_T")
class ConnectableObservable(Observable[_T]):
"""Represents an observable that can be connected and
disconnected."""
def __init__(self, source: abc.ObservableBase[_T], subject: abc.SubjectBase[_T]):
self.subject = subject
self.has_subscription = False
self.subscription: Optional[abc.DisposableBase] = None
self.source = source
super().__init__()
def _subscribe_core(
self,
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
return self.subject.subscribe(observer, scheduler=scheduler)
def connect(
self, scheduler: Optional[abc.SchedulerBase] = None
) -> Optional[abc.DisposableBase]:
"""Connects the observable."""
if not self.has_subscription:
self.has_subscription = True
def dispose() -> None:
self.has_subscription = False
subscription = self.source.subscribe(self.subject, scheduler=scheduler)
self.subscription = CompositeDisposable(subscription, Disposable(dispose))
return self.subscription
def auto_connect(self, subscriber_count: int = 1) -> Observable[_T]:
"""Returns an observable sequence that stays connected to the
source indefinitely to the observable sequence.
Providing a subscriber_count will cause it to connect() after
that many subscriptions occur. A subscriber_count of 0 will
result in emissions firing immediately without waiting for
subscribers.
"""
connectable_subscription: List[Optional[abc.DisposableBase]] = [None]
count = [0]
source = self
is_connected = [False]
if subscriber_count == 0:
connectable_subscription[0] = source.connect()
is_connected[0] = True
def subscribe(
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
count[0] += 1
should_connect = count[0] == subscriber_count and not is_connected[0]
subscription = source.subscribe(observer)
if should_connect:
connectable_subscription[0] = source.connect(scheduler)
is_connected[0] = True
def dispose() -> None:
subscription.dispose()
count[0] -= 1
is_connected[0] = False
return Disposable(dispose)
return Observable(subscribe)