建立自己网站免费,手机网站设计平台,游戏开发课程,东莞凤岗网站建设制作文章目录 SharedFlow数据流的收集和事件订阅的区别launchIn() 和 shareIn() 的区别SharedFlow 与 Flow、Channel 的区别shareIn() 适用场景 shareIn() 的具体参数说明shareIn() 的 replay 参数shareIn() 的 started 参数WhileSubscribed() 的参数及适用场景 MutableSharedFlow、… 文章目录 SharedFlow数据流的收集和事件订阅的区别launchIn() 和 shareIn() 的区别SharedFlow 与 Flow、Channel 的区别shareIn() 适用场景 shareIn() 的具体参数说明shareIn() 的 replay 参数shareIn() 的 started 参数WhileSubscribed() 的参数及适用场景 MutableSharedFlow、asSharedFlow()MutableSharedFlowasSharedFlow() StateFlow、MutableStateFlow、asStateFlow()总结 SharedFlow 和 StateFlow 都是一种特殊的 Flow 的变种SharedFlow 是把 Flow 的功能从数据流的收集改成了事件流StateFlow 是 SharedFlow 的细化细分StateFlow 将事件流改成了状态订阅经过这两个 Flow 的变种一下就把 Flow 的适用场景切换到了一个非常实用的范围。
但是需要了解 SharedFlow 和 StateFlow 就必须得掌握 Flow所以在开始这篇文章前建议回看 Flow 的功能定位、工作原理及应用场景 和 Flow 的创建、收集和操作符在掌握 Flow 的基础上学习 SharedFlow 和 StateFlow 才能更好的了解透彻。
SharedFlow
数据流的收集和事件订阅的区别
在前面的章节我们有讲过一个用 Flow 持续获取实时天气信息的例子
var count 0// 只负责数据的生产获取
val weatherFlow flow {while (count 10) {emit(getWeather())countdelay(1000)}
}suspend fun getWeather() withContext(Dispatchers.IO) {Sunny
}fun main() runBlocking {val scope CoroutineScope(EmptyCoroutineContext)scope.launch {weatherFlow.collect {println(count ${count}, Weather: $it)}println(done)}delay(15000)
}通过调用 collect() [订阅] 接收数据。说是 [订阅] 更确切的说其实是 [收集]Flow 虽然是个数据流但是它只是设定好了数据流的规则而并不是直接开始启动数据流开始生产生产过程是在调用 collect() 时才启动的并且是每次调用 collect() 都分别启动一个新的数据流。这也是为什么 Flow 提供数据收集的函数名为 collect() 而不是 subscribe()。
按上面这么说你或许会有疑惑既然数据收集这个视角它这么没用为什么不一开始就把 Flow 设计成事件订阅的视角
因为 数据收集并不是没用而是更加通用而已。事件订阅的场景比普通的数据收集要多得多但并不能简单的说它更有用而是它更专、更垂直。
实际上事件订阅就是一种特殊类型的数据收集用数据收集的功能是能实现事件订阅的功能这种事件订阅的 API 在 Flow 也有提供就是 SharedFlow。
launchIn() 和 shareIn() 的区别
在讲解 Flow 操作符时有提到一个函数 launchIn()它会立即用你指定的 CoroutineScope 启动一个协程然后调用 collect() 在这个协程启动收集流程
Collect.ktpublic fun T FlowT.launchIn(scope: CoroutineScope): Job scope.launch {collect() // tail-call
}还有一个函数 shareIn()shareIn() 可以将一个已存在的 Flow 转换成 SharedFlow同样的也是调用 collect() 收集
fun main() runBlocking {val scope CoroutineScope(EmptyCoroutineContext)val flow flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// 通过 shareIn() 将 Flow 转换成 SharedFlowval sharedFlow flow.shareIn(scope)scope.launch {sharedFlow.collect { println(SharedFlow in Coroutine: $it)}}delay(10000)
}输出结果
SharedFlow in Coroutine: 1
SharedFlow in Coroutine: 2
SharedFlow in Coroutine: 3shareIn() 还可以定制指定数据收集的时间比如修改为 SharingStarted.Eagerly 立即启动
fun main() runBlocking {val scope CoroutineScope(EmptyCoroutineContext)val flow flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// 指定启动收集时间为 Eagerly创建 SharedFlow 的同时立即启动生产val sharedFlow flow.shareIn(scope, SharingStarted.Eagerly)scope.launch {sharedFlow.collect { println(SharedFlow in Coroutine: $it)}}delay(10000)
}shareIn() 其实是会创建一个新的 Flow把上游 Flow 发送的每条数据转发到下游每个调用 collect() 的 FlowCollector
Share.ktpublic fun T FlowT.shareIn(scope: CoroutineScope,started: SharingStarted,replay: Int 0
): SharedFlowT {val config configureSharing(replay)// 创建了一个 SharedFlow转发上游发送的数据val shared MutableSharedFlowT(replay replay,extraBufferCapacity config.extraBufferCapacity,onBufferOverflow config.onBufferOverflow)Suppress(UNCHECKED_CAST)val job scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)return ReadonlySharedFlow(shared, job)
}launchIn() 和 shareIn() 都是启动一个协程并在协程里调用 Flow 的 collect()但它们有两点区别 shareIn() 并不是第一时间就启动 Flow 的收集可以通过参数定制启动收集的时间 shareIn() 会创建一个新的 Flow 并返回返回的 Flow 类型就是 SharedFlowSharedFlow 实际上只是把上游 Flow 发送的每条数据做转发
SharedFlow 与 Flow、Channel 的区别
fun main() runBlocking {val scope CoroutineScope(EmptyCoroutineContext)// 上游 Flow 数据val flow flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// shareIn() 创建了一个新的 Flow 为 SharedFlowval sharedFlow flow.shareIn(scope, SharingStarted.Eagerly)scope.launch {// 上游 Flow 已经准备了数据所以这里的 SharedFlow 其实是转发上游的数据下来收集sharedFlow.collect { println(SharedFlow in Coroutine: $it)}}delay(10000)
}从这个角度分析shareIn() 创建了 SharedFlow 把 [数据生产和数据收集流程分拆开]这个特点让 SharedFlow 相比传统的 Flow倒不如说 SharedFlow 更像是 Channel。
但 SharedFlow 和 Channel 不一样的是SharedFlow 不是瓜分式的而是每条数据都会发送到每一个进行中的 collect()。
比如下面我们调用多个 collect()正常是会完整打印所有数据和 Flow 一样
fun main() runBlocking {val scope CoroutineScope(EmptyCoroutineContext)val flow flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}val sharedFlow flow.shareIn(scope, SharingStarted.Eagerly)scope.launch {sharedFlow.collect { println(SharedFlow in Coroutine 1: $it)}}scope.launch {sharedFlow.collect { println(SharedFlow in Coroutine 2: $it)}} delay(10000)
}输出结果
SharedFlow in Coroutine 1: 1
SharedFlow in Coroutine 2: 1
SharedFlow in Coroutine 2: 2
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 3普通的 Flow 多次调用 collect() 都独立完整跑一次流程SharedFlow 是多次调用 collect() 只跑一次流程即用 SharedFlow 事件订阅调用 collect() 发生在数据发送之后调用 collect() 前发送的数据将丢失。
为了验证上面的说法我们给调用 collect() 前分别加延迟
fun main() runBlocking {val scope CoroutineScope(EmptyCoroutineContext)val flow flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}val sharedFlow flow.shareIn(scope, SharingStarted.Eagerly)scope.launch {delay(500)// SharedFlow 设置了调用 sharedIn() 就立即启动// 这里延迟 500ms 后才调用的 collect()错过了第一条数据sharedFlow.collect { println(SharedFlow in Coroutine 1: $it)}}scope.launch {delay(1500)// 这里延迟 1500ms 后才调用的 collect()错过了两条数据sharedFlow.collect { println(SharedFlow in Coroutine 2: $it)}} delay(10000)
}Flow 的输出结果
SharedFlow in Coroutine 1: 1
SharedFlow in Coroutine 2: 1
SharedFlow in Coroutine 2: 2
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 3SharedFlow 的输出结果
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 2: 3
SharedFlow in Coroutine 1: 3Flow 还是会正常打印完整这里 SharedFlow 分别加了延迟后都错过了数据接收没有打印出来。这就是 SharedFlow。
接下来我们再聊聊 [冷] 和 [热] 的话题。
在官方说法Channel 是 [热] 的Flow 是 [冷] 的Channel 的 [热] 其实就是不读取数据它也可以发送Flow 的 [冷] 是只在每次 collect() 被调用的时候才会启动数据发送流程。
SharedFlow 虽然是 Flow但它是 [热] 的因为 SharedFlow 的活跃状态跟它是否正在被调用 collect() 函数来收集数据是无关的所以它的活跃状态是独立的这就跟 Channel 一样了所以它是 [热] 的。
SharedFlow 的 [热] 和 Channel 的 [热] 不太一样Channel 的 [热] 是真的数据的发送和读取两个流程完全独立的SharedFlow 的 [热] 其实并不是技术角度的描述而是业务逻辑角度的它的本质依然是在 collect() 被调用时才开始生产本质上 SharedFlow 依然是 [冷] 的但是由于它背靠着一个独立运作的 Flow所以它生产出来的数据跟 collect() 的调用并没有绑定而是独立生产的。
所以我们说 SharedFlow 是 [冷] 的那就是从技术角度分析说它是 [热] 的那就是从业务逻辑角度分析两个说法都对。
对 SharedFlow 调用多次 collect() 虽然它被收集了多次但它们的数据源是同一套而不是各自一套这就是共享。
shareIn() 适用场景
shareIn() 适用场景 数据来源共享如果想要一个 Flow 它被收集多次的时候都可以共享相同的数据生产流程就可以用 shareIn() 将 Flow 转成 SharedFlow再让下游去收集 SharedFlow多次的收集之间是依赖的同一个数据流 生产提前启动SharedFlow 能做到数据生产的提前启动如果有一个 Flow 有耗时的初始化的操作但不希望在调用 collect() 的时候等待这个初始化也可以将 Flow 转成 SharedFlow因为在这里的目的并不是共享而是为了提前启动生产 事件订阅因为 SharedFlow 是 [热] 的生产流程是独立的那么在开始生产之后才开始收集那就会漏掉之前生产的数据所以 SharedFlow 也适合对从头开始收集数据没有需求的场景
SharedFlow 的效果是把 [数据生产和数据收集流程分拆开]这个效果让 SharedFlow 可以满足各种需求场景比如事件订阅、提前启动生产、数据来源共享等通常来讲我们也会把它用在事件流订阅的场景。
shareIn() 的适用场景本质上就是 [数据生产和数据收集流程分拆开] 的需求都可以将 Flow 转成 SharedFlow 来解决。SharedFlow 的 [热] 就是我们使用 SharedFlow 的根本原因。
SharedFlow 并不会因为生产流程的结束而结束订阅即数据生产都发送完了SharedFlow 的 collect() 会一直运行直到外部协程的取消而抛异常结束。
shareIn() 的具体参数说明
Share.ktpublic fun T FlowT.shareIn(scope: CoroutineScope,started: SharingStarted,replay: Int 0
): SharedFlowT {val config configureSharing(replay)val shared MutableSharedFlowT(replay replay,extraBufferCapacity config.extraBufferCapacity,onBufferOverflow config.onBufferOverflow)Suppress(UNCHECKED_CAST)val job scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)return ReadonlySharedFlow(shared, job)
}shareIn() 有三个参数第一个参数在前面小节也有讲到是指定启动事件订阅时所在的协程主要是讲后面的两个参数。
shareIn() 的 replay 参数
replay 参数第一个功能是缓冲可以做到根据设置的数值暂存数据。
比如在前面小节用 SharedFlow 收集数据延迟一段时间在调用 collect()最终结果就是会丢失数据因为 SharedFlow 创建的同时也已经启动了生产流程。
现在我们设置 replay 参数暂存一条数据
fun main() runBlocking {val scope CoroutineScope(EmptyCoroutineContext)val flow flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// 设置 replay 参数为 1表示暂存缓冲一条数据val sharedFlow flow.shareIn(scope, SharingStarted.Eagerly, 1)scope.launch {delay(500)sharedFlow.collect { println(SharedFlow in Coroutine 1: $it)}}scope.launch {delay(1500)sharedFlow.collect {println(SharedFlow in Coroutine 2: $it)}}delay(10000)
}输出结果
// SharedFlow in Coroutine 1: 1这个来不及消费的数据打印了出来
// SharedFlow in Coroutine 2: 1没有打印因为缓冲暂存 1 条数据所以第一条数据被丢弃但打印了第二条数据 SharedFlow in Coroutine 2: 2
SharedFlow in Coroutine 1: 1
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 2: 2
SharedFlow in Coroutine 2: 3
SharedFlow in Coroutine 1: 3将 replay 参数设置为 1 暂存一条数据第一个 collect() 来不及消费的第一条数据也正常打印了出来第二个 collect() 因为错过了两条数据但暂存数据只有一条所以第一条数据被丢弃了第二条数据正常打印。
replay 参数第二个功能是缓存对于已经消费过的数据也依然缓冲下来用来给后面新订阅的 collect() 使用。简单说就是除了缓冲来不及消费的数据还会把已经消费过的数据也缓冲下来。
用个例子说明下 replay 缓存的功能
fun main() runBlocking {val scope CoroutineScope(EmptyCoroutineContext)val flow flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// replay 设置缓存两条数据val sharedFlow flow.shareIn(scope, SharingStarted.Eagerly, 2)scope.launch {delay(500)sharedFlow.collect { println(SharedFlow in Coroutine 1: $it)}}scope.launch {delay(5000) // 都错过了数据订阅时会立即被发送出来sharedFlow.collect {println(SharedFlow in Coroutine 2: $it)}}delay(10000)
}输出结果
SharedFlow in Coroutine 1: 1
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 2 // 立即收到缓存发送的数据
SharedFlow in Coroutine 2: 3 // 立即收到缓存发送的数据上面的例子可以看到第二个 collect() 已经都错过了数据的发送但在调用 collect() 时还是能立即接收到数据接收到的数据数量是 replay 设置的大小。
总结下 replay 参数的功能 replay 参数是即有缓冲功能又有缓存功能的缓冲提前生产数据但收集靠后时要缓冲暂存多少漏接收的数据 在来不及消费的时候可以先把数据缓冲下来缓冲的尺寸就是 replay 的大小 对于已经使用完的数据它也会继续缓存下来等到有新订阅的时候直接发送出来缓存的大小也是 replay 的大小
shareIn() 的 started 参数
started 是用于设置数据生产的启动时间。它有三个数值 SharingStarted.Eagerly调用 shareIn() 创建 SharedFlow 的同时立即启动数据的生产 SharingStarted.Lazily调用第一次 collect() 时才会启动数据的生产 SharingStarted.WhileSubscribed()可以把上游的数据流给结束和重启的规则它是一种复杂化的 Lazily不仅是在第一次订阅的时候启动上游的数据流而且在下游所有订阅全都结束之后它会把上游 Flow 的生产流程也结束掉这时候如果再有订阅它就会重新启动上游的数据流
在讲解 WhileSubscribed() 前插入一个话题SharedFlow 的 collect() 订阅并不会因为上游 Flow 数据发送完成而结束。SharedFlow 的 collect() 返回值是 Nothing
SharedFlow.ktoverride suspend fun collect(collector: FlowCollectorT): Nothing一个返回值为 Nothing说明它永远不会返回一直运行下去除非抛出异常。比如 SharedFlow 这里就可以通过外部协程的取消或抛出其他异常取消 SharedFlow 的订阅。
继续回到 WhileSubscribed()模拟调用第一个 collect() 后结束了 Flow 的生产流程调用第二个 collect() 时会重新启动生产流程但这时候使用的是上一个 collect() 生产过的数据缓存
fun main() runBlocking {val scope CoroutineScope(EmptyCoroutineContext)val flow flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}val sharedFlow flow.shareIn(scope, SharingStarted.WhileSubscribed(), 2)scope.launch {val parent thislaunch {// 第二个 collect() 订阅前取消了第一个 SharedFlow 的订阅触发上游重启生产delay(4000) // 通过取消协程的方式结束 SharedFlow 的订阅// 模拟 SharedFlow 所有 collect() 结束了才让第二个 collect() 能触发上游的 Flow 的重启parent.cancel()}delay(1500)sharedFlow.collect { println(SharedFlow in Coroutine 1: $it)}}scope.launch {delay(5000)sharedFlow.collect {println(SharedFlow in Coroutine 2: $it)}}delay(10000)
}输出结果
SharedFlow in Coroutine 1: 1
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 2 // 立即打印出来使用了上一个 collect() 的数据缓存
SharedFlow in Coroutine 2: 3 // 立即打印出来使用了上一个 collect() 的数据缓存
SharedFlow in Coroutine 2: 1 // 重启上游 Flow 生产发送的数据
SharedFlow in Coroutine 2: 2 // 重启上游 Flow 生产发送的数据
SharedFlow in Coroutine 2: 3 // 重启上游 Flow 生产发送的数据上面的例子通过第一个 collect() 手动取消外部协程的方式模拟第二个 collect() 触发重启上游 Flow 生产发送。从打印可以看到第二个 collect() 在重启前会先收到上一个 collect() 的缓存数据然后重新接收到了上游 Flow 重启发送的数据。
WhileSubscribed() 的参数及适用场景
SharingStarted.ktpublic fun WhileSubscribed(stopTimeoutMillis: Long 0,replayExpirationMillis: Long Long.MAX_VALUE
): SharingStarted StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)WhileSubscribed() 还可以定制参数 stopTimeoutMillis所有 collect() 结束之后依然不判定为 [所有 collect() 都结束]等待该参数设置的时间后还没有新的 collect()才认为是都结束将上游 Flow 的生产流程结束 replayExpirationMillis设置订阅流程结束之后、清空缓存的让缓存失效的时间最后一个 collect() 结束且 stopTimeoutMillis 也超时之后再过 replayExpirationMillis 时间后还是没有新的 collect() 被调用缓存下来的数据就会被丢弃
WhileSubscribed() 的适用场景假设软件里有一个可以被订阅的事件流这个事件流会在多个地方被订阅而同时这个事件流还非常的重即生产流程非常消耗资源所以想要在所有订阅都结束的时候及时的结束生产这种场景就很适合用 WhileSubscribed() 配置自动结束、自动重启的 SharedFlow。
MutableSharedFlow、asSharedFlow()
MutableSharedFlow
一直以来我们讲解 Flow 都是在内部调用 emit() 生产数据然后在一个地方调用 collect() 收集发送过来的数据但我们的业务需求可能需要能支持在外部调用 emit() 发送数据比如 UI 交互点击事件在用户点击按钮的时候可以从它的点击监听回调能调用一下 emit()而不是只能从上游的 Flow 把事件发送出来这是一种很正常的需求。
需要能在外部调用 emit() 发送数据要用 MutableSharedFlow
fun main() runBlocking {val scope CoroutineScope(EmptyCoroutineContext)val mutableSharedFlow MutableSharedFlowString()scope.launch {mutableSharedFlow.emit(Hello) // 可以外部发送数据mutableSharedFlow.collect {println(mutableSharedFlow: $it)}}delay(10000)
}或许你会有疑问为什么 Flow 不直接提供可以外部发送数据的 Flow
Flow 不提供外部发送数据的原因也很简单Flow 数据流本就是一个需要指定规则然后按规则一条条把数据发送出来本来就不需要从外部发送数据如果还提供外部发送数据内部和外部的数据混乱数据源不统一反而更容易让开发者不小心写出错误代码。
SharedFlow 是事件流它天然就是需要从各个地方发送数据的Flow 数据流的限制就不需要了允许让我们在任何协程发送数据。
MutableSharedFlow 相比 shareIn() 并不是从内部发送数据变成了外部发送数据而是从只能从上游 Flow 发送数据变成可以从任何协程发送数据。
MutableSharedFlow 和 shareIn() 的选择 如果要创建一个事件流在外部生产数据发送数据源就用 MutableSharedFlow 如果已经有了一个生产事件流的 Flow不需要自己写生产数据的代码直接将 Flow 用 shareIn() 转成 SharedFlow 即可
asSharedFlow()
在 Android 经常会在 ViewModel 定义 MutableSharedFlow在数据请求后通过 MutableSharedFlow 调用 emit() 将结果通知到页面 Activity 或 Fragment也就是页面需要订阅事件希望把 MutableSharedFlow 暴露出来给外部去订阅但又不希望让外部也来发送数据的时候可以通过 asSharedFlow() 将 MutableSharedFlow 转换成 SharedFlow
class MyViewModel : ViewModel() {private val repository Repository()private val flow MutableSharedFlowString()val sharedFlow flow.asSharedFlow() // 提供给外部订阅fun request() {viewModelScope.launch {repository.request {flow.emit(it)}} }
}class MyActivity : ComponentActivity() {private val viewModel by viewModelsMyViewModel()override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)lifecycleScope.launch {viewModel.sharedFlow.collect {// ...}}}
}StateFlow、MutableStateFlow、asStateFlow()
StateFlow 是一种特殊的 SharedFlowSharedFlow 是把数据流的收集收窄到了事件流的订阅StateFlow 则是进一步的收窄从事件流的订阅收窄到了状态的订阅。
StateFlow.ktpublic interface StateFlowout T : SharedFlowT {// 最新的一条数据public val value: T
}可以看到 StateFlow 其实就是 SharedFlow 的子接口并且增加了一个 value 属性。value 就是 SharedFlow 里最新的一条数据所以 StateFlow 实际上就是一个仅仅保存最新的一个事件的 SharedFlow 事件流。
我们用 StateFlow 实现状态订阅
fun main() runBlocking {val scope CoroutineScope(EmptyCoroutineContext)val name MutableStateFlow(test) // 提供初始值scope.launch { name.collect {println(State: $it)}}scope.launch {delay(2000)name.emit(Hello world)}delay(10000)
}输出结果
State: test
State: Hello world如果想 将 Flow 转换成 StateFlow 也可以使用 stateIn()
fun main() runBlocking {val scope CoroutineScope(EmptyCoroutineContext)val flow flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// 通过 stateIn() 将 Flow 转换成 StateFlowval stateFlow flow.stateIn(scope)scope.launch { stateFlow.collect {println(State: $it)}}delay(10000)
}asStateFlow() 可以把可读写的 MutableStateFlow 转换为只读的 StateFlow用来对外暴露的时候把写数据的功能给隐藏。
总结
1、数据流的收集和事件订阅的区别
Flow 数据流的数据收集相比事件订阅场景更加通用事件订阅的场景比普通的数据收集要多得多但并不能简单的说它更有用而是它更专、更垂直。
实际上事件订阅就是一种特殊类型的数据收集用数据收集的功能是能实现事件订阅的功能这种事件订阅的 API 在 Flow 也有提供就是 SharedFlow。
2、launchIn() 和 shareIn() 的区别
launchIn() 和 shareIn() 都是启动一个协程并在协程里调用 Flow 的 collect()但它们有两点区别 shareIn() 并不是第一时间就启动 Flow 的收集可以通过参数定制启动收集的时间 shareIn() 会创建一个新的 Flow 并返回返回的 Flow 类型就是 SharedFlowSharedFlow 实际上只是把上游 Flow 发送的每条数据做转发
3、SharedFlow 与 Flow、Channel 的区别
1SharedFlow 和 Flow 的区别
普通的 Flow 多次调用 collect() 都独立完整跑一次流程SharedFlow 是多次调用 collect() 只跑一次流程即用 SharedFlow 事件订阅调用 collect() 发生在数据发送之后调用 collect() 前发送的数据将丢失。
2SharedFlow 和 Channel 的区别
在官方说法Channel 是 [热] 的Flow 是 [冷] 的Channel 的 [热] 其实就是不读取数据它也可以发送Flow 的 [冷] 是只在每次 collect() 被调用的时候才会启动数据发送流程。
SharedFlow 虽然是 Flow但它是 [热] 的因为 SharedFlow 的活跃状态跟它是否正在被调用 collect() 函数来收集数据是无关的所以它的活跃状态是独立的这就跟 Channel 一样了所以它是 [热] 的。
SharedFlow 的 [热] 和 Channel 的 [热] 不太一样Channel 的 [热] 是真的数据的发送和读取两个流程完全独立的SharedFlow 的 [热] 其实并不是技术角度的描述而是业务逻辑角度的它的本质依然是在 collect() 被调用时才开始生产本质上 SharedFlow 依然是 [冷] 的但是由于它背靠着一个独立运作的 Flow所以它生产出来的数据跟 collect() 的调用并没有绑定而是独立生产的。
所以我们说 SharedFlow 是 [冷] 的那就是从技术角度分析说它是 [热] 的那就是从业务逻辑角度分析两个说法都对。
对 SharedFlow 调用多次 collect() 虽然它被收集了多次但它们的数据源是同一套而不是各自一套这就是共享。
4、shareIn() 的适用场景 数据来源共享如果想要一个 Flow 它被收集多次的时候都可以共享相同的数据生产流程就可以用 shareIn() 将 Flow 转成 SharedFlow再让下游去收集 SharedFlow多次的收集之间是依赖的同一个数据流 生产提前启动SharedFlow 能做到数据生产的提前启动如果有一个 Flow 有耗时的初始化的操作但不希望在调用 collect() 的时候等待这个初始化也可以将 Flow 转成 SharedFlow因为在这里的目的并不是共享而是为了提前启动生产 事件订阅因为 SharedFlow 是 [热] 的生产流程是独立的那么在开始生产之后才开始收集那就会漏掉之前生产的数据所以 SharedFlow 也适合对从头开始收集数据没有需求的场景
SharedFlow 的效果是把 [数据生产和数据收集流程分拆开]这个效果让 SharedFlow 可以满足各种需求场景比如事件订阅、提前启动生产、数据来源共享等通常来讲我们也会把它用在事件流订阅的场景。
shareIn() 的适用场景本质上就是 [数据生产和数据收集流程分拆开] 的需求都可以将 Flow 转成 SharedFlow 来解决。SharedFlow 的 [热] 就是我们使用 SharedFlow 的根本原因。
SharedFlow 并不会因为生产流程的结束而结束订阅即数据生产都发送完了SharedFlow 的 collect() 会一直运行直到外部协程的取消而抛异常结束。
5、shareIn() 的具体参数
1shareIn() 的 replay 参数
replay 参数是即有缓冲功能又有缓存功能的缓冲提前生产数据但收集靠后时要缓冲暂存多少漏接收的数据 在来不及消费的时候可以先把数据缓冲下来缓冲的尺寸就是 replay 的大小 对于已经使用完的数据它也会继续缓存下来等到有新订阅的时候直接发送出来缓存的大小也是 replay 的大小
2shareIn() 的 started 参数
started 是用于设置数据生产的启动时间。它有三个数值 SharingStarted.Eagerly调用 shareIn() 创建 SharedFlow 的同时立即启动数据的生产 SharingStarted.Lazily调用第一次 collect() 时才会启动数据的生产 SharingStarted.WhileSubscribed()可以把上游的数据流给结束和重启的规则它是一种复杂化的 Lazily不仅是在第一次订阅的时候启动上游的数据流而且在下游所有订阅全都结束之后它会把上游 Flow 的生产流程也结束掉这时候如果再有订阅它就会重新启动上游的数据流
除此之外还插入说明了 SharedFlow 的 collect() 订阅并不会因为上游 Flow 数据发送完成而结束。
SharedFlow 的 collect() 返回值为 Nothing说明它永远不会返回一直运行下去除非抛出异常。比如可以通过外部协程的取消间接取消 SharedFlow 的订阅。
3WhileSubscribed() 的适用场景
假设软件里有一个可以被订阅的事件流这个事件流会在多个地方被订阅而同时这个事件流还非常的重即生产流程非常消耗资源所以想要在所有订阅都结束的时候及时的结束生产这种场景就很适合用 WhileSubscribed() 配置自动结束、自动重启的 SharedFlow。
6、MutableSharedFlow、asSharedFlow()
1MutableSharedFlow
需要能在外部调用 emit() 发送数据要用 MutableSharedFlow。
Flow 不提供外部发送数据的原因也很简单Flow 数据流本就是一个需要指定规则然后按规则一条条把数据发送出来本来就不需要从外部发送数据如果还提供外部发送数据内部和外部的数据混乱数据源不统一反而更容易让开发者不小心写出错误代码。
SharedFlow 是事件流它天然就是需要从各个地方发送数据的Flow 数据流的限制就不需要了允许让我们在任何协程发送数据。
MutableSharedFlow 和 shareIn() 的选择 如果要创建一个事件流在外部生产数据发送数据源就用 MutableSharedFlow 如果已经有了一个生产事件流的 Flow不需要自己写生产数据的代码直接将 Flow 用 shareIn() 转成 SharedFlow 即可
2asSharedFlow()
希望把 MutableSharedFlow 暴露出来给外部去订阅但又不希望让外部也来发送数据的时候可以通过 asSharedFlow() 将 MutableSharedFlow 转换成 SharedFlow。
7、StateFlow、MutableStateFlow、asStateFlow()
StateFlow 其实就是 SharedFlow 的子接口StateFlow 实际上就是一个仅仅保存最新的一个事件的 SharedFlow 事件流。
将 Flow 转换成 StateFlow 也可以使用 stateIn()。
asStateFlow() 可以把可读写的 MutableStateFlow 转换为只读的 StateFlow用来对外暴露的时候把写数据的功能给隐藏。