Skip to content

Commit

Permalink
Add cleanCancelledChildren on CancellableManager
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesMcD committed Jan 14, 2025
1 parent 5ff725e commit c2f04e5
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package com.mirego.trikot.streams.cancellable

import com.mirego.trikot.foundation.concurrent.AtomicListReference
import com.mirego.trikot.foundation.concurrent.AtomicReference
import com.mirego.trikot.foundation.concurrent.dispatchQueue.SynchronousSerialQueue
import kotlinx.atomicfu.atomic

class CancellableManager : Cancellable {
class CancellableManager : Cancellable, VerifiableCancelledState {
private val serialQueue = SynchronousSerialQueue()
private val queueList = AtomicListReference<Cancellable>()
private val isCancelled = AtomicReference(false)

private var isCancelledDelegate = atomic(false)
override var isCancelled: Boolean by isCancelledDelegate

fun <T : Cancellable> add(cancellable: T): T {
queueList.add(cancellable)

if (isCancelled.value) {
if (isCancelled) {
doCancelAll()
}
return cancellable
Expand All @@ -28,8 +30,22 @@ class CancellableManager : Cancellable {
)
}

fun cleanCancelledChildren() {
serialQueue.dispatch {
val value = queueList.value
val alreadyCancelledList = value.filter {
when (it) {
is VerifiableCancelledState -> it.isCancelled
else -> false
}
}

queueList.removeAll(alreadyCancelledList)
}
}

override fun cancel() {
if (isCancelled.compareAndSet(isCancelled.value, true)) {
if (isCancelledDelegate.compareAndSet(isCancelledDelegate.value, true)) {
doCancelAll()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ package com.mirego.trikot.streams.cancellable

import com.mirego.trikot.foundation.concurrent.AtomicReference
import com.mirego.trikot.foundation.concurrent.dispatchQueue.SynchronousSerialQueue
import kotlinx.atomicfu.atomic

class CancellableManagerProvider : Cancellable {
class CancellableManagerProvider : Cancellable, VerifiableCancelledState {
private val serialQueue = SynchronousSerialQueue()
private val internalCancellableManagerRef = AtomicReference(CancellableManager())
private val isCancelled = AtomicReference(false)

private val isCancelledDelegate = atomic(false)
override val isCancelled: Boolean by isCancelledDelegate

fun cancelPreviousAndCreate(): CancellableManager {
return CancellableManager().also { cancellableManager ->
internalCancellableManagerRef.getAndSet(cancellableManager).cancel()
serialQueue.dispatch {
if (isCancelled.value) {
if (isCancelled) {
cancellableManager.cancel()
}
}
Expand All @@ -21,7 +24,7 @@ class CancellableManagerProvider : Cancellable {

override fun cancel() {
serialQueue.dispatch {
if (isCancelled.compareAndSet(isCancelled.value, true)) {
if (isCancelledDelegate.compareAndSet(isCancelledDelegate.value, true)) {
internalCancellableManagerRef.value.cancel()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.mirego.trikot.streams.cancellable

interface VerifiableCancelledState {
val isCancelled: Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

package com.mirego.trikot.streams.reactive.promise

import com.mirego.trikot.foundation.concurrent.AtomicReference
import com.mirego.trikot.foundation.concurrent.dispatchQueue.SynchronousSerialQueue
import com.mirego.trikot.streams.cancellable.Cancellable
import com.mirego.trikot.streams.cancellable.CancellableManager
import com.mirego.trikot.streams.cancellable.VerifiableCancelledState
import com.mirego.trikot.streams.reactive.BehaviorSubjectImpl
import com.mirego.trikot.streams.reactive.Publishers
import com.mirego.trikot.streams.reactive.observeOn
import com.mirego.trikot.streams.reactive.subscribe
import kotlinx.atomicfu.atomic
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber

Expand All @@ -20,50 +22,68 @@ class Promise<T> internal constructor(
private val serialQueue = SynchronousSerialQueue()
private val result = BehaviorSubjectImpl<T>(serialQueue = serialQueue)

private val isCancelled: AtomicReference<Boolean> = AtomicReference(false)
private val internalCancellableManager: CancellableManager = CancellableManager().also {
cancellableManager?.add(it)
private val internalCancellableManager: CancellableManager = CancellableManager()

private val onParentCancellation = object : Cancellable, VerifiableCancelledState {
override var isCancelled: Boolean by atomic(false)

override fun cancel() {
serialQueue.dispatch {
isCancelled = true
internalCancellableManager.cancel()

if (result.value == null && result.error == null) {
result.error = CancelledPromiseException
}
}
}
}

/**
* When a result is received, we want to make sure we're considered as "cancelled" by the provided cancellableManager if any.
* This allows to use `CancellableManager::cleanCancelledChildren` with promises to clean up finished promises
* and avoids keeping a reference on the promise, which is finished anyway. No further results will be emitted.
*
* We don't want the `onParentCancellation::cancel()` to run since the provided cancellableManager was not cancelled,
* so we don't call `cancel()` directly, but instead modify the `isCancelled` value manually.
*/
private fun onResultReceived() {
onParentCancellation.isCancelled = true
internalCancellableManager.cancel()
}

init {
cancellableManager?.add(onParentCancellation)

upstream
.observeOn(serialQueue)
.subscribe(
internalCancellableManager,
onNext = { value ->
if (!isCancelled.value) {
if (!onParentCancellation.isCancelled) {
result.value = value
result.complete()

internalCancellableManager.cancel()
onResultReceived()
}
},
onError = { error ->
if (!isCancelled.value) {
if (!onParentCancellation.isCancelled) {
result.error = error

internalCancellableManager.cancel()
onResultReceived()
}
},
onCompleted = {
if (!isCancelled.value) {
if (!onParentCancellation.isCancelled) {
if (result.value == null && result.error == null) {
result.error = EmptyPromiseException
}
internalCancellableManager.cancel()

onResultReceived()
}
}
)

cancellableManager?.add {
serialQueue.dispatch {
isCancelled.setOrThrow(false, true)

if (result.value == null && result.error == null) {
result.error = CancelledPromiseException
}
}
}
}

override fun subscribe(s: Subscriber<in T>) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.mirego.trikot.streams.cancellable

import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue

class CancellableManagerTests {
Expand All @@ -23,4 +25,94 @@ class CancellableManagerTests {

assertTrue { cancelled }
}

@Test
fun ifCleanCancelledChildrenIsNotCalled_thenAllChildAreCalledOnParentCancellation() {
val parentCancellableManager = CancellableManager()
val childList = mutableListOf<TestableCancellable>()
var cancelCounter = 0

for (i in 1..3) {
val cancellableManager = TestableCancellable(
onCancelCalled = { cancelCounter++ }
)

childList.add(cancellableManager)
parentCancellableManager.add(cancellableManager)
}

// We have 3 cancellable children
assertEquals(3, childList.size)

// Cancel first 2 children
childList[0].cancel()
childList[1].cancel()

assertEquals(2, cancelCounter)

parentCancellableManager.cancel()

// All 3 children are called, so 5 total calls
assertEquals(5, cancelCounter)
}

@Test
fun ifCleanCancelledChildrenIsCalled_thenOnlyCancelRemainingChildOnParentCancellation() {
val parentCancellableManager = CancellableManager()
val childList = mutableListOf<TestableCancellable>()
var cancelCounter = 0

for (i in 1..3) {
val cancellableManager = TestableCancellable(
onCancelCalled = { cancelCounter++ }
)

childList.add(cancellableManager)
parentCancellableManager.add(cancellableManager)
}

// We have 3 cancellable children
assertEquals(3, childList.size)

var expectedCancelCalls = 0
assertEquals(expectedCancelCalls, cancelCounter)

// Cancel first 2 children
childList[0].cancel()
childList[1].cancel()

expectedCancelCalls += 2
assertEquals(2, cancelCounter)
assertTrue(childList[0].isCancelled)
assertTrue(childList[1].isCancelled)
assertFalse(childList[2].isCancelled)

parentCancellableManager.cleanCancelledChildren()

// Nothing has changed
assertEquals(expectedCancelCalls, cancelCounter)
assertTrue(childList[0].isCancelled)
assertTrue(childList[1].isCancelled)
assertFalse(childList[2].isCancelled)

parentCancellableManager.cancel()

// Only 1 child is called on parent cancellation since the other 2 have been cleaned
expectedCancelCalls++
assertEquals(expectedCancelCalls, cancelCounter)
assertTrue(childList[0].isCancelled)
assertTrue(childList[1].isCancelled)
assertTrue(childList[2].isCancelled)
}

private class TestableCancellable(
private val onCancelCalled: (() -> Unit)? = null
) : Cancellable, VerifiableCancelledState {
override var isCancelled = false

override fun cancel() {
isCancelled = true
onCancelCalled?.invoke()
}
}
}

0 comments on commit c2f04e5

Please sign in to comment.