forked from ReactiveX/RxSwift
-
Notifications
You must be signed in to change notification settings - Fork 0
/
BlockingObservable+Operators.swift
170 lines (149 loc) · 5.72 KB
/
BlockingObservable+Operators.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
//
// BlockingObservable+Operators.swift
// RxBlocking
//
// Created by Krunoslav Zaher on 10/19/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import RxSwift
/// The `MaterializedSequenceResult` enum represents the materialized
/// output of a BlockingObservable.
///
/// If the sequence terminates successfully, the result is represented
/// by `.completed` with the array of elements.
///
/// If the sequence terminates with error, the result is represented
/// by `.failed` with both the array of elements and the terminating error.
public enum MaterializedSequenceResult<T> {
case completed(elements: [T])
case failed(elements: [T], error: Error)
}
extension BlockingObservable {
/// Blocks current thread until sequence terminates.
///
/// If sequence terminates with error, terminating error will be thrown.
///
/// - returns: All elements of sequence.
public func toArray() throws -> [E] {
let results = materializeResult()
return try elementsOrThrow(results)
}
}
extension BlockingObservable {
/// Blocks current thread until sequence produces first element.
///
/// If sequence terminates with error before producing first element, terminating error will be thrown.
///
/// - returns: First element of sequence. If sequence is empty `nil` is returned.
public func first() throws -> E? {
let results = materializeResult(max: 1)
return try elementsOrThrow(results).first
}
}
extension BlockingObservable {
/// Blocks current thread until sequence terminates.
///
/// If sequence terminates with error, terminating error will be thrown.
///
/// - returns: Last element in the sequence. If sequence is empty `nil` is returned.
public func last() throws -> E? {
let results = materializeResult()
return try elementsOrThrow(results).last
}
}
extension BlockingObservable {
/// Blocks current thread until sequence terminates.
///
/// If sequence terminates with error before producing first element, terminating error will be thrown.
///
/// - returns: Returns the only element of an sequence, and reports an error if there is not exactly one element in the observable sequence.
public func single() throws -> E {
return try single { _ in true }
}
/// Blocks current thread until sequence terminates.
///
/// If sequence terminates with error before producing first element, terminating error will be thrown.
///
/// - parameter predicate: A function to test each source element for a condition.
/// - returns: Returns the only element of an sequence that satisfies the condition in the predicate, and reports an error if there is not exactly one element in the sequence.
public func single(_ predicate: @escaping (E) throws -> Bool) throws -> E {
let results = materializeResult(max: 2, predicate: predicate)
let elements = try elementsOrThrow(results)
if elements.count > 1 {
throw RxError.moreThanOneElement
}
guard let first = elements.first else {
throw RxError.noElements
}
return first
}
}
extension BlockingObservable {
/// Blocks current thread until sequence terminates.
///
/// The sequence is materialized as a result type capturing how the sequence terminated (completed or error), along with any elements up to that point.
///
/// - returns: On completion, returns the list of elements in the sequence. On error, returns the list of elements up to that point, along with the error itself.
public func materialize() -> MaterializedSequenceResult<E> {
return materializeResult()
}
}
extension BlockingObservable {
fileprivate func materializeResult(max: Int? = nil, predicate: @escaping (E) throws -> Bool = { _ in true }) -> MaterializedSequenceResult<E> {
var elements: [E] = Array<E>()
var error: Swift.Error?
let lock = RunLoopLock(timeout: timeout)
let d = SingleAssignmentDisposable()
defer {
d.dispose()
}
lock.dispatch {
let subscription = self.source.subscribe { event in
if d.isDisposed {
return
}
switch event {
case .next(let element):
do {
if try predicate(element) {
elements.append(element)
}
if let max = max, elements.count >= max {
d.dispose()
lock.stop()
}
} catch (let err) {
error = err
d.dispose()
lock.stop()
}
case .error(let err):
error = err
d.dispose()
lock.stop()
case .completed:
d.dispose()
lock.stop()
}
}
d.setDisposable(subscription)
}
do {
try lock.run()
} catch (let err) {
error = err
}
if let error = error {
return MaterializedSequenceResult.failed(elements: elements, error: error)
}
return MaterializedSequenceResult.completed(elements: elements)
}
fileprivate func elementsOrThrow(_ results: MaterializedSequenceResult<E>) throws -> [E] {
switch results {
case .failed(_, let error):
throw error
case .completed(let elements):
return elements
}
}
}