Skip to content

Commit

Permalink
Merge branch 'sgleadow-rx-blocking-remove-duplicate-code' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
kzaher committed Aug 1, 2017
2 parents 86dfc7d + 5559eb5 commit f6dacb2
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 178 deletions.
195 changes: 44 additions & 151 deletions RxBlocking/BlockingObservable+Operators.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,7 @@ extension BlockingObservable {
///
/// - returns: All elements of sequence.
public func toArray() throws -> [E] {
var elements: [E] = Array<E>()

var error: Swift.Error?

let lock = RunLoopLock(timeout: timeout)

let d = SingleAssignmentDisposable()

defer {
d.dispose()
}

lock.dispatch {
let subscription = self.source.subscribe { e in
if d.isDisposed {
return
}
switch e {
case .next(let element):
elements.append(element)
case .error(let e):
error = e
d.dispose()
lock.stop()
case .completed:
d.dispose()
lock.stop()
}
}

d.setDisposable(subscription)
}

try lock.run()

if let error = error {
throw error
}

return elements
return try convertToArray()
}
}

Expand All @@ -67,50 +28,7 @@ extension BlockingObservable {
///
/// - returns: First element of sequence. If sequence is empty `nil` is returned.
public func first() throws -> E? {
var element: E?

var error: Swift.Error?

let d = SingleAssignmentDisposable()

defer {
d.dispose()
}

let lock = RunLoopLock(timeout: timeout)

lock.dispatch {
let subscription = self.source.subscribe { e in
if d.isDisposed {
return
}

switch e {
case .next(let e):
if element == nil {
element = e
}
break
case .error(let e):
error = e
default:
break
}

d.dispose()
lock.stop()
}

d.setDisposable(subscription)
}

try lock.run()

if let error = error {
throw error
}

return element
return try convertToArray(max: 1).first
}
}

Expand All @@ -121,47 +39,7 @@ extension BlockingObservable {
///
/// - returns: Last element in the sequence. If sequence is empty `nil` is returned.
public func last() throws -> E? {
var element: E?

var error: Swift.Error?

let d = SingleAssignmentDisposable()

defer {
d.dispose()
}

let lock = RunLoopLock(timeout: timeout)

lock.dispatch {
let subscription = self.source.subscribe { e in
if d.isDisposed {
return
}
switch e {
case .next(let e):
element = e
return
case .error(let e):
error = e
default:
break
}

d.dispose()
lock.stop()
}

d.setDisposable(subscription)
}

try lock.run()

if let error = error {
throw error
}

return element
return try convertToArray().last
}
}

Expand All @@ -182,61 +60,76 @@ extension BlockingObservable {
/// - parameter predicate: A function to test each source element for a condition.
/// - returns: Returns the only element of an sequence that satisfies the condition in the predicate, and reports an error if there is not exactly one element in the sequence.
public func single(_ predicate: @escaping (E) throws -> Bool) throws -> E? {
var element: E?
let elements = try convertToArray(max: 2, predicate: predicate)

switch elements.count {
case 0:
throw RxError.noElements
case 1:
return elements.first
default:
throw RxError.moreThanOneElement
}
}
}

extension BlockingObservable {
fileprivate func convertToArray(max: Int? = nil, predicate: @escaping (E) throws -> Bool = { _ in true }) throws -> [E] {
var elements: [E] = Array<E>()

var error: Swift.Error?

let lock = RunLoopLock(timeout: timeout)

let d = SingleAssignmentDisposable()

defer {
d.dispose()
}

let lock = RunLoopLock(timeout: timeout)

lock.dispatch {
let subscription = self.source.subscribe { e in
let subscription = self.source.subscribe { event in
if d.isDisposed {
return
}
switch e {
case .next(let e):
switch event {
case .next(let element):
do {
if try !predicate(e) {
return
if try predicate(element) {
elements.append(element)
}
if element == nil {
element = e
} else {
throw RxError.moreThanOneElement
if let max = max, elements.count >= max {
d.dispose()
lock.stop()
}
} catch (let err) {
error = err
d.dispose()
lock.stop()
}
return
case .error(let e):
error = e
case .error(let err):
error = err
d.dispose()
lock.stop()
case .completed:
if element == nil {
error = RxError.noElements
}
d.dispose()
lock.stop()
}

d.dispose()
lock.stop()
}

d.setDisposable(subscription)
}

try lock.run()

do {
try lock.run()
} catch (let err) {
error = err
}

if let error = error {
throw error
}

return element
return elements
}
}
9 changes: 9 additions & 0 deletions RxBlocking/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ extension ObservableType {
extension ObservableType {
public func last() throws -> E? {}
}

extension Observableype {
public func single() throws -> E? {}
public func single(_ predicate: @escaping (E) throws -> Bool) throws -> E? {}
}

extension ObservableType {
public func toError() -> Swift.Error? {}
}
```


Loading

0 comments on commit f6dacb2

Please sign in to comment.