forked from ReactiveX/RxPY
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtyping.py
57 lines (50 loc) · 1.33 KB
/
typing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from threading import Thread
from typing import Callable, TypeVar, Union
from .abc.observable import Subscription
from .abc.observer import OnCompleted, OnError, OnNext
from .abc.periodicscheduler import (
ScheduledPeriodicAction,
ScheduledSingleOrPeriodicAction,
)
from .abc.scheduler import (
AbsoluteOrRelativeTime,
AbsoluteTime,
RelativeTime,
ScheduledAction,
)
from .abc.startable import StartableBase
_TState = TypeVar("_TState")
_T1 = TypeVar("_T1")
_T2 = TypeVar("_T2")
Action = Callable[[], None]
Mapper = Callable[[_T1], _T2]
MapperIndexed = Callable[[_T1, int], _T2]
Predicate = Callable[[_T1], bool]
PredicateIndexed = Callable[[_T1, int], bool]
Comparer = Callable[[_T1, _T1], bool]
SubComparer = Callable[[_T1, _T1], int]
Accumulator = Callable[[_TState, _T1], _TState]
Startable = Union[StartableBase, Thread]
StartableTarget = Callable[..., None]
StartableFactory = Callable[[StartableTarget], Startable]
__all__ = [
"Accumulator",
"AbsoluteTime",
"AbsoluteOrRelativeTime",
"Comparer",
"Mapper",
"MapperIndexed",
"OnNext",
"OnError",
"OnCompleted",
"Predicate",
"PredicateIndexed",
"RelativeTime",
"SubComparer",
"ScheduledPeriodicAction",
"ScheduledSingleOrPeriodicAction",
"ScheduledAction",
"Startable",
"StartableTarget",
"Subscription",
]