Skip to content

Commit

Permalink
Fix potential for deadlocks in Worker binding.
Browse files Browse the repository at this point in the history
Removes nested unconfined lifecycle flow collection.
  • Loading branch information
psteiger committed May 12, 2023
1 parent b06d260 commit 4a59f54
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,16 @@

package com.uber.rib.core

import com.uber.autodispose.ScopeProvider
import com.uber.autodispose.lifecycle.LifecycleEndedException
import com.uber.autodispose.lifecycle.LifecycleNotStartedException
import io.reactivex.CompletableSource
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.rx2.rxCompletable

/**
* Converts a [SharedFlow] of lifecycle events into a [ScopeProvider]. See [asScopeCompletable] for
* constraints.
*/
internal fun <T : Comparable<T>> SharedFlow<T>.asScopeProvider(
range: ClosedRange<T>,
context: CoroutineContext = EmptyCoroutineContext,
): ScopeProvider = asScopeCompletable(range, context).asScopeProvider()

/**
* Converts a [SharedFlow] of lifecycle events into a [CompletableSource] that completes once the
* flow emits the ending event.
Expand All @@ -50,7 +41,9 @@ internal fun <T : Comparable<T>> SharedFlow<T>.asScopeCompletable(
context: CoroutineContext = EmptyCoroutineContext,
): CompletableSource {
ensureAlive(range)
return rxCompletable(RibDispatchers.Unconfined + context) { first { it == range.endInclusive } }
return rxCompletable(RibDispatchers.Unconfined + context) {
takeWhile { it < range.endInclusive }.collect()
}
}

private fun <T : Comparable<T>> SharedFlow<T>.ensureAlive(range: ClosedRange<T>) {
Expand All @@ -60,5 +53,3 @@ private fun <T : Comparable<T>> SharedFlow<T>.ensureAlive(range: ClosedRange<T>)
lastEmitted >= range.endInclusive -> throw LifecycleEndedException()
}
}

private fun CompletableSource.asScopeProvider() = ScopeProvider { this }
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
package com.uber.rib.core

import androidx.annotation.VisibleForTesting
import com.uber.autodispose.ScopeProvider
import com.uber.autodispose.lifecycle.LifecycleScopeProvider
import com.uber.rib.core.lifecycle.InteractorEvent
import com.uber.rib.core.lifecycle.PresenterEvent
import com.uber.rib.core.lifecycle.WorkerEvent
import io.reactivex.Observable
import io.reactivex.subjects.CompletableSubject
import java.lang.ref.WeakReference
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
Expand Down Expand Up @@ -311,6 +313,10 @@ private fun <T : Comparable<T>> Worker.bind(
CoroutineStart.DEFAULT
}

val completable = CompletableSubject.create()
val scopeProvider = ScopeProvider { completable }
val workerScopeProvider = WorkerScopeProvider(scopeProvider)

/*
* We need `Dispatchers.Unconfined` to react immediately to lifecycle flow emissions, and we need
* `CoroutineStart.Undispatched` to prevent coroutines launched in `onStart` with `Dispatchers.Unconfined`
Expand All @@ -327,6 +333,7 @@ private fun <T : Comparable<T>> Worker.bind(
lifecycle
.takeWhile { it < lifecycleRange.endInclusive }
.onCompletion {
completable.onComplete()
bindAndReportWorkerInfo(
workerDurationListenerWeakRef,
WorkerEvent.STOP,
Expand All @@ -341,7 +348,6 @@ private fun <T : Comparable<T>> Worker.bind(
WorkerEvent.START,
coroutineContext,
) {
val workerScopeProvider = WorkerScopeProvider(lifecycle.asScopeProvider(lifecycleRange))
onStart(workerScopeProvider)
}
}
Expand Down

0 comments on commit 4a59f54

Please sign in to comment.