From c2f04e59178038e09cbd4c506ac0b816f61eca1b Mon Sep 17 00:00:00 2001 From: Charles MacDuff Date: Tue, 14 Jan 2025 14:39:28 -0500 Subject: [PATCH] Add cleanCancelledChildren on CancellableManager --- .../streams/cancellable/CancellableManager.kt | 26 +++++- .../cancellable/CancellableManagerProvider.kt | 11 ++- .../cancellable/VerifiableCancelledState.kt | 5 + .../streams/reactive/promise/Promise.kt | 60 ++++++++---- .../cancellable/CancellableManagerTests.kt | 92 +++++++++++++++++++ 5 files changed, 165 insertions(+), 29 deletions(-) create mode 100644 trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/cancellable/VerifiableCancelledState.kt diff --git a/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/cancellable/CancellableManager.kt b/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/cancellable/CancellableManager.kt index 2f3db7825..58244c4a7 100644 --- a/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/cancellable/CancellableManager.kt +++ b/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/cancellable/CancellableManager.kt @@ -1,18 +1,20 @@ package com.mirego.trikot.streams.cancellable import com.mirego.trikot.foundation.concurrent.AtomicListReference -import com.mirego.trikot.foundation.concurrent.AtomicReference import com.mirego.trikot.foundation.concurrent.dispatchQueue.SynchronousSerialQueue +import kotlinx.atomicfu.atomic -class CancellableManager : Cancellable { +class CancellableManager : Cancellable, VerifiableCancelledState { private val serialQueue = SynchronousSerialQueue() private val queueList = AtomicListReference() - private val isCancelled = AtomicReference(false) + + private var isCancelledDelegate = atomic(false) + override var isCancelled: Boolean by isCancelledDelegate fun add(cancellable: T): T { queueList.add(cancellable) - if (isCancelled.value) { + if (isCancelled) { doCancelAll() } return cancellable @@ -28,8 +30,22 @@ class CancellableManager : Cancellable { ) } + fun cleanCancelledChildren() { + serialQueue.dispatch { + val value = queueList.value + val alreadyCancelledList = value.filter { + when (it) { + is VerifiableCancelledState -> it.isCancelled + else -> false + } + } + + queueList.removeAll(alreadyCancelledList) + } + } + override fun cancel() { - if (isCancelled.compareAndSet(isCancelled.value, true)) { + if (isCancelledDelegate.compareAndSet(isCancelledDelegate.value, true)) { doCancelAll() } } diff --git a/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/cancellable/CancellableManagerProvider.kt b/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/cancellable/CancellableManagerProvider.kt index ae521ef4e..9fd4e1bb6 100644 --- a/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/cancellable/CancellableManagerProvider.kt +++ b/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/cancellable/CancellableManagerProvider.kt @@ -2,17 +2,20 @@ package com.mirego.trikot.streams.cancellable import com.mirego.trikot.foundation.concurrent.AtomicReference import com.mirego.trikot.foundation.concurrent.dispatchQueue.SynchronousSerialQueue +import kotlinx.atomicfu.atomic -class CancellableManagerProvider : Cancellable { +class CancellableManagerProvider : Cancellable, VerifiableCancelledState { private val serialQueue = SynchronousSerialQueue() private val internalCancellableManagerRef = AtomicReference(CancellableManager()) - private val isCancelled = AtomicReference(false) + + private val isCancelledDelegate = atomic(false) + override val isCancelled: Boolean by isCancelledDelegate fun cancelPreviousAndCreate(): CancellableManager { return CancellableManager().also { cancellableManager -> internalCancellableManagerRef.getAndSet(cancellableManager).cancel() serialQueue.dispatch { - if (isCancelled.value) { + if (isCancelled) { cancellableManager.cancel() } } @@ -21,7 +24,7 @@ class CancellableManagerProvider : Cancellable { override fun cancel() { serialQueue.dispatch { - if (isCancelled.compareAndSet(isCancelled.value, true)) { + if (isCancelledDelegate.compareAndSet(isCancelledDelegate.value, true)) { internalCancellableManagerRef.value.cancel() } } diff --git a/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/cancellable/VerifiableCancelledState.kt b/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/cancellable/VerifiableCancelledState.kt new file mode 100644 index 000000000..40ac8d8eb --- /dev/null +++ b/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/cancellable/VerifiableCancelledState.kt @@ -0,0 +1,5 @@ +package com.mirego.trikot.streams.cancellable + +interface VerifiableCancelledState { + val isCancelled: Boolean +} diff --git a/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/reactive/promise/Promise.kt b/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/reactive/promise/Promise.kt index ff1f23e29..df639905e 100644 --- a/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/reactive/promise/Promise.kt +++ b/trikot-streams/streams/src/commonMain/kotlin/com/mirego/trikot/streams/reactive/promise/Promise.kt @@ -2,13 +2,15 @@ package com.mirego.trikot.streams.reactive.promise -import com.mirego.trikot.foundation.concurrent.AtomicReference import com.mirego.trikot.foundation.concurrent.dispatchQueue.SynchronousSerialQueue +import com.mirego.trikot.streams.cancellable.Cancellable import com.mirego.trikot.streams.cancellable.CancellableManager +import com.mirego.trikot.streams.cancellable.VerifiableCancelledState import com.mirego.trikot.streams.reactive.BehaviorSubjectImpl import com.mirego.trikot.streams.reactive.Publishers import com.mirego.trikot.streams.reactive.observeOn import com.mirego.trikot.streams.reactive.subscribe +import kotlinx.atomicfu.atomic import org.reactivestreams.Publisher import org.reactivestreams.Subscriber @@ -20,50 +22,68 @@ class Promise internal constructor( private val serialQueue = SynchronousSerialQueue() private val result = BehaviorSubjectImpl(serialQueue = serialQueue) - private val isCancelled: AtomicReference = AtomicReference(false) - private val internalCancellableManager: CancellableManager = CancellableManager().also { - cancellableManager?.add(it) + private val internalCancellableManager: CancellableManager = CancellableManager() + + private val onParentCancellation = object : Cancellable, VerifiableCancelledState { + override var isCancelled: Boolean by atomic(false) + + override fun cancel() { + serialQueue.dispatch { + isCancelled = true + internalCancellableManager.cancel() + + if (result.value == null && result.error == null) { + result.error = CancelledPromiseException + } + } + } + } + + /** + * When a result is received, we want to make sure we're considered as "cancelled" by the provided cancellableManager if any. + * This allows to use `CancellableManager::cleanCancelledChildren` with promises to clean up finished promises + * and avoids keeping a reference on the promise, which is finished anyway. No further results will be emitted. + * + * We don't want the `onParentCancellation::cancel()` to run since the provided cancellableManager was not cancelled, + * so we don't call `cancel()` directly, but instead modify the `isCancelled` value manually. + */ + private fun onResultReceived() { + onParentCancellation.isCancelled = true + internalCancellableManager.cancel() } init { + cancellableManager?.add(onParentCancellation) + upstream .observeOn(serialQueue) .subscribe( internalCancellableManager, onNext = { value -> - if (!isCancelled.value) { + if (!onParentCancellation.isCancelled) { result.value = value result.complete() - internalCancellableManager.cancel() + onResultReceived() } }, onError = { error -> - if (!isCancelled.value) { + if (!onParentCancellation.isCancelled) { result.error = error - internalCancellableManager.cancel() + onResultReceived() } }, onCompleted = { - if (!isCancelled.value) { + if (!onParentCancellation.isCancelled) { if (result.value == null && result.error == null) { result.error = EmptyPromiseException } - internalCancellableManager.cancel() + + onResultReceived() } } ) - - cancellableManager?.add { - serialQueue.dispatch { - isCancelled.setOrThrow(false, true) - - if (result.value == null && result.error == null) { - result.error = CancelledPromiseException - } - } - } } override fun subscribe(s: Subscriber) { diff --git a/trikot-streams/streams/src/commonTest/kotlin/com/mirego/trikot/streams/cancellable/CancellableManagerTests.kt b/trikot-streams/streams/src/commonTest/kotlin/com/mirego/trikot/streams/cancellable/CancellableManagerTests.kt index 614f5d186..8ca4d5ae5 100644 --- a/trikot-streams/streams/src/commonTest/kotlin/com/mirego/trikot/streams/cancellable/CancellableManagerTests.kt +++ b/trikot-streams/streams/src/commonTest/kotlin/com/mirego/trikot/streams/cancellable/CancellableManagerTests.kt @@ -1,6 +1,8 @@ package com.mirego.trikot.streams.cancellable import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse import kotlin.test.assertTrue class CancellableManagerTests { @@ -23,4 +25,94 @@ class CancellableManagerTests { assertTrue { cancelled } } + + @Test + fun ifCleanCancelledChildrenIsNotCalled_thenAllChildAreCalledOnParentCancellation() { + val parentCancellableManager = CancellableManager() + val childList = mutableListOf() + var cancelCounter = 0 + + for (i in 1..3) { + val cancellableManager = TestableCancellable( + onCancelCalled = { cancelCounter++ } + ) + + childList.add(cancellableManager) + parentCancellableManager.add(cancellableManager) + } + + // We have 3 cancellable children + assertEquals(3, childList.size) + + // Cancel first 2 children + childList[0].cancel() + childList[1].cancel() + + assertEquals(2, cancelCounter) + + parentCancellableManager.cancel() + + // All 3 children are called, so 5 total calls + assertEquals(5, cancelCounter) + } + + @Test + fun ifCleanCancelledChildrenIsCalled_thenOnlyCancelRemainingChildOnParentCancellation() { + val parentCancellableManager = CancellableManager() + val childList = mutableListOf() + var cancelCounter = 0 + + for (i in 1..3) { + val cancellableManager = TestableCancellable( + onCancelCalled = { cancelCounter++ } + ) + + childList.add(cancellableManager) + parentCancellableManager.add(cancellableManager) + } + + // We have 3 cancellable children + assertEquals(3, childList.size) + + var expectedCancelCalls = 0 + assertEquals(expectedCancelCalls, cancelCounter) + + // Cancel first 2 children + childList[0].cancel() + childList[1].cancel() + + expectedCancelCalls += 2 + assertEquals(2, cancelCounter) + assertTrue(childList[0].isCancelled) + assertTrue(childList[1].isCancelled) + assertFalse(childList[2].isCancelled) + + parentCancellableManager.cleanCancelledChildren() + + // Nothing has changed + assertEquals(expectedCancelCalls, cancelCounter) + assertTrue(childList[0].isCancelled) + assertTrue(childList[1].isCancelled) + assertFalse(childList[2].isCancelled) + + parentCancellableManager.cancel() + + // Only 1 child is called on parent cancellation since the other 2 have been cleaned + expectedCancelCalls++ + assertEquals(expectedCancelCalls, cancelCounter) + assertTrue(childList[0].isCancelled) + assertTrue(childList[1].isCancelled) + assertTrue(childList[2].isCancelled) + } + + private class TestableCancellable( + private val onCancelCalled: (() -> Unit)? = null + ) : Cancellable, VerifiableCancelledState { + override var isCancelled = false + + override fun cancel() { + isCancelled = true + onCancelCalled?.invoke() + } + } }