本文最后更新于 883 天前,其中的信息可能已经有所发展或是发生改变。
1. Flow替换Sequence(SequenceScope类上使用了RestrictsSuspension注解)
2. 冷数据流:flow (消费者消费时才会生产数据)
实际是通过SafeFlow类创建的Flow对象。SafeFlow继承自AbstractFlow。而AbstractFlow同时继承了Flow和 CancellableFlow两个接口。这也就意味着我们创建的冷数据流Flow是可以取消的。
– 取消:直接取消对应所在的协程即可
- 为了保证上下文一致性,禁止withContext进行线程调度,可以使用flowOn
-
flowOn:可多次调用,后面的只影响没有上下文的,
如:{A}.flowOn(Dispatchers.IO).{B}.flowOn(Dispatchers.Main)
第一个影响第一个,第二个不影响A,只影响B
3. 操作符
- 流程操作符:
onStart:在上游流启动之前被调用。
onEach:在上游流的每个值被下游发出之前调用。
onCompletion:在流程完成或取消后调用,并将取消异常或失败作为操作的原因参数传递。
异常操作符:
catch: 只捕获之前的,后续继续发生异常不会捕获。 - 转换操作符:
transform:
.transform {
emit(it)
emit("transform $it")
}
map/mapNotNull: 数据转换
.map {
it * 5
}
fliter: 过滤
.filter {
it < 2
}
zip: 组合流
flowA.zip(flowB) { valueA,valueB
"$valueA : $valueB"
}
…
– 限制操作符:
take: 发射次数大于等于count的值时,通过抛出异常来取消执行
.take(2) // 次数2,只发送2次
takeWhile: 类似filter,但会中断
.takeWhile {
it == 1 // 不是当前情况 直接终止
}
drop: 与take相反,是先丢掉不处理指定数目
`.drop(2) 丢掉前两次的数据流`
– 背压:
1. 使用buffer操作符进行一定的缓解
2. 其他处理
4. 热数据流:(数据在流之外生成然后传递到流)
- StateFlow : (与LiveData类似)
它的值是唯一的 处理状态
它允许被多个观察者共用 (因此是共享的数据流)
它永远只会把最新的值重现给订阅者,这与活跃观察者的数量是无关的必须配置初始值
value 空安全
StateFlow 订阅者仅仅只可以接受最新的和之后的值
防抖stateIn 将数据流转换为 StateFlow
- SharedFlow:
>SharedFlow 可以保留历史数据,订阅者可以获取之前的历史数据
>MutableSharedFlow 没有起始值(处理事件)
>MutableSharedFlow 发射值需要调用 emit()/tryEmit() 方法,没有 setValue() 方法
>MutableSharedFlow 不同于MutableStateFlow 不需要传入默认值
4.1 StateFlow和ShareFlow的使用
> StateFlow 使用状态任然存在和LiveData相同的问题(横竖屏切换等)
> MutableSharedFlow构造函数内包含一个 replay(整形)参数,其内部会通过参数生成一个容器记录最新的 replay个最新的数据,订阅者会默认从容器内获取。初始值0即不会存在↑的问题
CoroutineScope
CoroutineName(指定协程名称)
Job(协程的生命周期,用于取消协程)
CoroutineDispatcher,可以指定协程运行的线程
suspend,async,await
async里面如果发生异常是不会直接抛出的,直到 await 得到调用
结构化并发: B在A协程内创建,并且B在A执行完之前执行结束
viewModelScope 扩展
管理同时多个协程:(会在内部创建一个新的CoroutineScope启动执行协程,并追踪直到完成。)
coroutineScope 任意一个协程异常,全部都会停止
supervisorScope 任意一个协程异常不影响其他协程
withContext: 切换调度器
withContext(Dispatchers.IO) {
/* perform network IO here */
}
CoroutineName("") 协程命名
suspendCancellableCoroutine 和 suspendCoroutine 可以将回调函数转换为协程
private suspend fun <T> Call<T>.await(): T {
return suspendCoroutine { continuation ->
enqueue(object : Callback<T> {
override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
override fun onResponse(call: Call<T>, response: Response<T>) {
val body = response.body()
if (body != null) continuation.resume(body)
else continuation.resumeWithException(RuntimeException("response body is null"))
}
})
}
}
Android ktx库