forked from ReactiveX/RxPY
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduleddisposable.py
38 lines (26 loc) · 1.12 KB
/
scheduleddisposable.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from threading import RLock
from typing import Any
from reactivex import abc
from .singleassignmentdisposable import SingleAssignmentDisposable
class ScheduledDisposable(abc.DisposableBase):
"""Represents a disposable resource whose disposal invocation will
be scheduled on the specified Scheduler"""
def __init__(
self, scheduler: abc.SchedulerBase, disposable: abc.DisposableBase
) -> None:
"""Initializes a new instance of the ScheduledDisposable class
that uses a Scheduler on which to dispose the disposable."""
self.scheduler = scheduler
self.disposable = SingleAssignmentDisposable()
self.disposable.disposable = disposable
self.lock = RLock()
super().__init__()
@property
def is_disposed(self) -> bool:
return self.disposable.is_disposed
def dispose(self) -> None:
"""Disposes the wrapped disposable on the provided scheduler."""
def action(scheduler: abc.SchedulerBase, state: Any) -> None:
"""Scheduled dispose action"""
self.disposable.dispose()
self.scheduler.schedule(action)