代码之家  ›  专栏  ›  技术社区  ›  adek111

Kotlin Flow:当Fragment不可见时取消订阅SharedFlow

  •  1
  • adek111  · 技术社区  · 4 年前

    我读过类似的话题,但找不到合适的答案:

    在我的 Repository 我感冒了 Flow 我想分享给2 Presenters / ViewModels 所以我的选择是使用 shareIn 操作员。

    让我们来看看Android文档的例子:

    val latestNews: Flow<List<ArticleHeadline>> = flow {
        ...
    }.shareIn(
        externalScope,  // e.g. CoroutineScope(Dispatchers.IO)?
        replay = 1,
        started = SharingStarted.WhileSubscribed()
    )
    

    文档建议 externalScope 参数:

    用于共享流的CoroutinScope。这个作用域应该比任何消费者都活得更久,以便在需要时保持共享流的活力。

    然而,寻找如何停止订阅的答案 流量 ,第二个链接中投票最多的答案是:

    解决方案不是取消流,而是取消它启动的范围。

    对我来说,这些答案是矛盾的 SharedFlow 的案件。不幸的是,我 Presenter / ViewModel 即使在其 onCleared 被召唤。

    如何预防这种情况?这是我如何消费的一个例子 流量 在我的 节目主持人 / ViewModel :

    fun doSomethingUseful(): Flow<OtherModel> {
        return repository.latestNews.map(OtherModel)
    

    如果这有帮助的话,我正在使用MVI架构,所以 doSomethingUseful 对用户创建的某些意图做出反应。

    0 回复  |  直到 4 年前
        1
  •  5
  •   adek111    4 年前

    感谢Mark Keen的评论和帖子,我认为我取得了令人满意的结果。

    我已经理解了中定义的范围 shareIn 参数不必与我的消费者操作的范围相同。更改范围 BasePresenter / BaseViewModel CoroutineScope viewModelScope 似乎解决了主要问题。您甚至不需要手动取消此范围,如中所定义 Android docs :

    init {
        viewModelScope.launch {
            // Coroutine that will be canceled when the ViewModel is cleared.
        }
    }
    

    请记住,违约 viewModelScope 调度员是 Main 这并不明显,也可能不是你想要的!要更改调度程序,请使用 viewModelScope.launch(YourDispatcher) .

    更重要的是,我的热 SharedFlow 由另一场感冒转变而来 Flow 创建于 callbackFlow 回调API(基于 Channels API-这很复杂…)

    将收集范围更改为 viewModelScope ,我得到了 ChildCancelledException: Child of the scoped flow was cancelled 从该API发出新数据时发生异常。这个问题在GitHub上的两个问题中都有很好的记录:

    如前所述,使用 offer send :

    offer用于非暂停上下文,send用于暂停上下文。

    不幸的是,就传播的异常而言,offer与发送是不对称的(通常忽略来自发送的CancellationException,而在nom挂起上下文中来自offer的CancellationExceptions则不会)。

    我们希望在974中通过offerOrClosed或更改offer语义来修复它

    至于1.4.2的Kotlin协程,#974还没有修复——我希望它能在不久的将来修复,以避免意外 CancellationException .

    最后,我建议玩 started 参数在 shareIn 操作员。在所有这些变化之后,我不得不改变 WhileSubscribed() Lazily 在我的用例中。

    如果我发现任何新信息,我会更新这篇文章。希望我的研究能节省一些人的时间。

        2
  •  3
  •   VIVEK CHOUDHARY    4 年前

    使用SharedFlow。在下面的示例中,我从一个片段中发出值,并在另一个片段上收集它。

    ViewModel:

    class MenuOptionsViewModel : ViewModel() {
    private val _option = MutableSharedFlow<String>()
    val option = _option.asSharedFlow()
    
    suspend fun setOption(o : String){
        _option.emit(o)
    }
    }
    

    碎片发射值:

    class BottomSheetOptionsFragment  : BottomSheetDialogFragment() , KodeinAware{
    
        override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)
        menuViewModel = activity?.run {
            ViewModelProviders.of(this).get(MenuOptionsViewModel::class.java)
        } ?: throw Exception("Invalid Activity")
    
        listViewOptions.adapter = ArrayAdapter<String>(
            requireContext(),
            R.layout.menu_text_item,
            options
        )
    
        listViewOptions.setOnItemClickListener { adapterView, view, i, l ->
            val entry: String = listViewOptions.getAdapter().getItem(i) as String
    
    // here we are emitting values
            GlobalScope.launch { menuViewModel.setOption(entry) }
            Log.d(TAG, "emitting flow $entry")
            dismiss()
        }
    }
    }
    

    片段收集值:

    class DetailFragment : BaseFragment(), View.OnClickListener, KodeinAware,
    OnItemClickListener {
    
    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
            super.onViewCreated(view, savedInstanceState)
            menuViewModel = activity?.run {
                ViewModelProviders.of(this).get(MenuOptionsViewModel::class.java)
            } ?: throw Exception("Invalid Activity")
    
    
    // collecting values
                    lifecycleScope.launchWhenStarted {
                menuViewModel.option.collect {
                    Log.d(TAG, "collecting flow $it")
                    
                }
            }
    }
    
        3
  •  2
  •   Mark    4 年前

    我试图用相关评论提供一个最小的例子。如前所述,SharedFlow的工作原理与 ConnectableObservable 在RxJava中。上游只会订阅一次,这意味着冷上游流量只会进行一次计算。你的存储库什么也不做,因为它是一个冷流,直到 SharedFlow 订阅,因此它没有作用域。

    使用RxJava和Flow有很多相似之处。似乎几乎没有必要创造 Flow Collector 如果基础Reactive Streams接口从扩展而来(但我不知道根本原因),那么开发人员本可以更容易地进行转换,也许他们希望通过新的api获得更大的灵活性,或者从Java 9实现和RxJava等另一个Reaction Streams实现中脱颖而出。

    class MyViewModel : ViewModel(), CoroutineScope {
    
        override val coroutineContext: CoroutineContext = Dispatchers.Main.immediate + SupervisorJob() // optional + CoroutineExceptionHandler()
    
        private val latestNews: Flow<List<String>> = doSomethingUseful()
                .flowOn(Dispatchers.IO) // upstream will operate on this dispatch
                .shareIn(scope = this, // shared in this scope - becomes hot flow  (or use viewModelScope) for lifetime of your view model - will only connect to doSomethingUseful once for lifetime of scope
                         replay = 1,
                         started = SharingStarted.WhileSubscribed())
    
    
        fun connect() : Flow<List<String>> = latestNews // expose SharedFlow to "n" number of subscribers or same subscriber more than once
    
        override fun onCleared() {
            super.onCleared()
            cancel() // cancel the shared flow - this scope is finished
        }
    }
    
    class MainActivity : AppCompatActivity(), CoroutineScope {
    
        override val coroutineContext: CoroutineContext = Dispatchers.Main.immediate + SupervisorJob()
    
        private var job : Job? = null
    
        // supply the same view model instance on config changes for example - its scope is larger
        private val vm : MyViewModel by viewModels()
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
        }
    
        override fun onStart() {
            super.onStart()
    
            job = launch {
                vm.connect().collect {
                    // observe latest emission of hot flow and subsequent emissions if any - either reconnect or connect for first time
                }
            }
        }
    
        override fun onStop() {
            super.onStop()
    
            // cancel the job but latest news is still "alive" and receives emissions as it is running in a larger scope of this scope
            job?.cancel()
        }
    
        override fun onDestroy() {
            super.onDestroy()
            // completely cancel this scope - the ViewModel scope is unaffected
            cancel()
        }
    }