From 1b03419998b1b97b441eb126f8cb6f391c663f4a Mon Sep 17 00:00:00 2001 From: Laimonas Turauskas Date: Wed, 28 Aug 2024 17:54:01 +0300 Subject: [PATCH] Update FlowAction API. (#373) * Update FlowAction API. * Fix test issues. --- .../formula/coroutines/FlowAction.kt | 62 +++++++++---------- .../formula/test/CoroutineTestableRuntime.kt | 43 ++++++------- 2 files changed, 50 insertions(+), 55 deletions(-) diff --git a/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowAction.kt b/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowAction.kt index a19315ea..9888386e 100644 --- a/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowAction.kt +++ b/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowAction.kt @@ -2,11 +2,14 @@ package com.instacart.formula.coroutines import com.instacart.formula.Action import com.instacart.formula.Cancelable -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.MainScope +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext /** * Adapter which allows creating Formula [Action] from Kotlin coroutine's. Take a @@ -24,20 +27,10 @@ interface FlowAction : Action { * } * ``` */ - inline fun fromFlow( - scope: CoroutineScope = MainScope(), - crossinline create: () -> Flow + fun fromFlow( + create: () -> Flow ): Action { - return object : FlowAction { - - override val scope: CoroutineScope = scope - - override fun flow(): Flow { - return create() - } - - override fun key(): Any = Unit - } + return FlowActionImpl(null, create) } /** @@ -51,31 +44,32 @@ interface FlowAction : Action { * * @param key Used to distinguish this [Action] from other actions. */ - inline fun fromFlow( - scope: CoroutineScope = MainScope(), + fun fromFlow( key: Any?, - crossinline create: () -> Flow + create: () -> Flow ): Action { - return object : FlowAction { - override val scope: CoroutineScope = scope - - override fun flow(): Flow { - return create() - } - - override fun key(): Any? = key - } + return FlowActionImpl(key, create) } } fun flow(): Flow - val scope: CoroutineScope - + @OptIn(DelicateCoroutinesApi::class) override fun start(send: (Event) -> Unit): Cancelable? { - val job = flow() - .onEach { send(it) } - .launchIn(scope) + val job = GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) { + withContext(Dispatchers.Unconfined) { + flow().collect { send(it) } + } + } return Cancelable(job::cancel) } +} + +private data class FlowActionImpl( + private val key: Any?, + private val factory: () -> Flow +) : FlowAction { + override fun flow(): Flow = factory() + + override fun key(): Any? = key } \ No newline at end of file diff --git a/formula/src/test/java/com/instacart/formula/test/CoroutineTestableRuntime.kt b/formula/src/test/java/com/instacart/formula/test/CoroutineTestableRuntime.kt index c931b518..9ba74fb0 100644 --- a/formula/src/test/java/com/instacart/formula/test/CoroutineTestableRuntime.kt +++ b/formula/src/test/java/com/instacart/formula/test/CoroutineTestableRuntime.kt @@ -10,7 +10,11 @@ import com.instacart.formula.coroutines.toFlow import com.instacart.formula.plugin.Dispatcher import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow @@ -18,11 +22,13 @@ import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestCoroutineDispatcher import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.test.resetMain import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.test.setMain +import kotlinx.coroutines.withContext import org.junit.rules.TestRule import org.junit.rules.TestWatcher import org.junit.runner.Description @@ -70,20 +76,30 @@ object CoroutinesTestableRuntime : TestableRuntime { } private class FlowRelay : Relay { - private val sharedFlow = MutableSharedFlow(0, 1) + private val sharedFlow = MutableSharedFlow(0, 0) override fun action(): Action = FlowAction.fromFlow { sharedFlow } + @OptIn(DelicateCoroutinesApi::class) override fun triggerEvent() { - sharedFlow.tryEmit(Unit) + GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) { + withContext(Dispatchers.Unconfined) { + sharedFlow.emit(Unit) + } + } } } private class FlowStreamFormulaSubject : FlowFormula(), StreamFormulaSubject { - private val sharedFlow = MutableSharedFlow(0, extraBufferCapacity = 1, BufferOverflow.DROP_OLDEST) + private val sharedFlow = MutableSharedFlow(0, extraBufferCapacity = 0) + @OptIn(DelicateCoroutinesApi::class) override fun emitEvent(event: Int) { - sharedFlow.tryEmit(event) + GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) { + withContext(Dispatchers.Unconfined) { + sharedFlow.emit(event) + } + } } override fun initialValue(input: String): Int = 0 @@ -133,28 +149,13 @@ private class CoroutineTestDelegate