asp 英文企业网站 免费,国际新闻最新新闻,中小型网站开发,做网站 外文参考文献Kotlin 协程基础系列#xff1a; Kotlin 协程基础一 —— 总体知识概述 Kotlin 协程基础二 —— 结构化并发#xff08;一#xff09; Kotlin 协程基础三 —— 结构化并发#xff08;二#xff09; Kotlin 协程基础四 —— CoroutineScope 与 CoroutineContext Kotlin 协程… Kotlin 协程基础系列 Kotlin 协程基础一 —— 总体知识概述 Kotlin 协程基础二 —— 结构化并发一 Kotlin 协程基础三 —— 结构化并发二 Kotlin 协程基础四 —— CoroutineScope 与 CoroutineContext Kotlin 协程基础五 —— Channel Kotlin 协程基础六 —— Flow Kotlin 协程基础七 —— Flow 操作符一 Kotlin 协程基础八 —— Flow 操作符二 Kotlin 协程基础九 —— SharedFlow 与 StateFlow Kotlin 协程基础十 —— 协作、互斥锁与共享变量 本节将介绍在协程间如果有先后执行、互相等待的需求时应该怎样去处理这种等待和协作的工作。更会与 Java 的线程的协作工作对比从而说明在线程中通常不太简单的协作操作在协程中很容易实现。
1、协程间的协作与等待
从运行角度来看协程天生就是并行的不论是对同等级的协程、父子协程还是毫无关系的协程。假如我们需要让协程互相等待希望在协程的执行过程中可以停住等待别的协程执行完毕可以使用 Job 的 join() 或 Deferred 的 await()。
线程对于这种互相等待的需求可以通过 Thread 的 join()还有 Future 和 CompletableFuture 以及 CountDownLatch。
CountDownLatch 适用于一个线程等待多个线程
fun main() runBlockingUnit {// countdown 译为倒计时latch 是门闩、插销组合起来就是用于倒计时的插销val countDownLatch CountDownLatch(2)thread {// await() 会在 CountDownLatch 内的 count 减到 0 时结束等待countDownLatch.await()println(Count in CountDownLatch is 0 now,Im free!)}thread {sleep(1000)// countDown() 会调用原子操作让 CountDownLatch 内的 count 减 1countDownLatch.countDown()println(Invoke countDown,count: ${countDownLatch.count})}thread {sleep(2000)countDownLatch.countDown()println(Invoke countDown,count: ${countDownLatch.count})}
}运行结果
Invoke countDown,count: 1
Count in CountDownLatch is 0 now,Im free!
Invoke countDown,count: 0修改 CountDownLatch 构造方法的 count 参数就可以修改要等待的线程数量对于这种一个等待多个的业务需求在协程中也可以用 join() 来做
fun main() runBlockingUnit {// 两个前置任务val preJob1 launch {delay(1000)}val preJob2 launch {delay(2000)}// 此协程需要等待两个协程执行之后再运行自己的内容launch {preJob1.join()preJob2.join()// 等待完前置任务再做自己的事...}
}实际上线程里也可以这么做只不过因为线程本身的结构化管理比较麻烦所以在正式的项目里很少真正的这么写。但因为协程可以结构化取消因此它的 join() 比线程的 join() 更实用在正式项目里的应用也较多。
其实用 Channel 也能实现类似 CountDownLatch 那种不指定具体等待哪些协程只等待固定的次数的效果
private fun channelSample() runBlockingUnit {// 指定 Channel 的容量为 2val channel ChannelUnit(2)// 由于要等待两次发送数据才能继续执行后续代码因此要 repeat(2) 接收launch {repeat(2) {channel.receive()}}launch {delay(1000)channel.send(Unit)}launch {delay(2000)channel.send(Unit)}
}通过两个简单的例子可以发现线程中有些复杂、比较底层、不太容易使用的协作和等待 API在协程中的对应/等价 API 难度要大大降低。
2、select()先到先得
select() 会在内部开启多线竞赛谁最快就用谁。
onJoin() 是仅限于在 select 代码块中才能调用的函数它是一个监听注册会监听 Job 的结束不论 Job 是正常结束还是被取消在其结束时都会回调执行 onJoin() 大括号的内容并且大括号的返回值会作为 select() 的返回值
fun main() runBlockingUnit {val scope CoroutineScope(EmptyCoroutineContext)val job1 scope.launch {delay(1000)println(job1 done)}val job2 scope.launch {delay(2000)println(job2 done)}val job scope.launch {val result select {// select 只执行最先结束的 onJoin 回调job1.onJoin {1}job2.onJoin {2}}println(result: $result)}joinAll(job, job1, job2)
}运行结果
job1 done
result: 1
job2 done结果能看出select() 只执行了最先结束的 job1 的 onJoin没有执行 job2 的。
与 Job 的 onJoin() 功能类似的还有Deferred 的 onAwait()、Channel 的 onSend()、onReceive() 以及 onReceiveCatching()。此外还有一个特殊的函数 onTimeout()如果 select() 内所有的监听回调都没有在 onTimeout() 设置的超时时间内完成那么就由 onTimeout() 作为 select() 的返回值
OptIn(ExperimentalCoroutinesApi::class)
fun main() runBlockingUnit {val scope CoroutineScope(EmptyCoroutineContext)val job1 scope.launch {delay(1000)println(job1 done)}val deferred scope.async {delay(2000)println(deferred done)}val channel ChannelString()val job scope.launch {val result select {// select 只执行最先结束的 onJoin 回调job1.onJoin {1}deferred.onAwait {2}channel.onSend(haha) {}/*channel.onReceive {}channel.onReceiveCatching {}*/onTimeout(500) {Timeout!}}println(result: $result)}joinAll(job, job1)
}运行结果
result: Timeout!
job1 done3、互斥锁和共享变量
在遇到一个不太好理解的知识点时我们还是先说线程再引入到协程中。
线程中有一个术语叫竞态条件或者说竞争条件英文是 race condition。这个词的含义比较广在 Java 和 Kotlin 这种高级编程语言中它指的是多个线程访问共享资源时由于缺乏并发控制导致资源的访问顺序不受控进而导致出现错误的结果的条件。
在 Kotlin 中仍然可以使用我们在 Java 中熟知的 synchronized 和 Lock 这两种锁机制来保证共享资源的线程安全也提供了新的选项下面我们来说一说。
3.1 Synchronized
Kotlin 中没有 synchronized 关键字代替它的是 Synchronized 注解。对于方法而言使用 Synchronized 注解的作用与 Java 中使用 synchronized 关键字修饰方法的作用是一样的。被 Synchronized 注解标记的方法不能同时被多个线程注意不是协程执行。
而 Java 中 synchronized 代码块在 Kotlin 中被 synchronized 函数代替了
fun main() runBlockingUnit {var number 0val lock Any()val thread1 thread {repeat(100_000) {synchronized(lock) {number}}}val thread2 thread {repeat(100_000) {synchronized(lock) {number--}}}thread1.join()thread2.join()println(result: $number) // 输出 0
}同样的代码结构也可以用在协程中
fun main() runBlockingUnit {var number 0val lock Any()val scope CoroutineScope(EmptyCoroutineContext)val job1 scope.launch {repeat(100_000) {synchronized(lock) {number}}}val job2 scope.launch {repeat(100_000) {synchronized(lock) {number--}}}job1.join()job2.join()println(result: $number)
}synchronized() 仍然掐住的是线程确切的说是掐住了执行 synchronized() 所在的协程的线程。虽然这样做有点浪费因为不止掐住了协程连运行该协程代码的线程都被掐住了但确实实现了共享资源的线程安全而且 synchronized() 本来也是针对线程的只不过从协程的角度看如果可以只掐住协程不影响运行该协程代码的线程就更好了。 这个区别就好像 delay() 与 sleep() 一样。协程的 delay() 只会挂起当前的协程但是不会影响其所在的线程而 sleep() 是让整个线程休眠。因此在协程中为了不影响整个线程我们通常都是使用 delay() 仅作用于当前协程而不会使用 sleep() 为了让协程挂起而影响到整个线程的运行。下一节要讲的 Mutex 就可以解决这个问题。 Lock 的用法也大致相同这里不多赘述。
3.2 Mutex
Mutex 是计算机领域的专属词汇全称是 mutual exclusion即互斥。Kotlin 提供的 Mutex 是基于协程的、挂起式的不同于前面两个是基于线程的、阻塞式的。Mutex 是协程自己的实现它不卡线程性能更好使用也很方便
fun main() runBlockingUnit {var number 0val mutex Mutex()val scope CoroutineScope(EmptyCoroutineContext)val job1 scope.launch {repeat(100_000) {try {mutex.lock()number} finally {mutex.unlock()}}}val job2 scope.launch {repeat(100_000) {mutex.withLock {number--}}}job1.join()job2.join()println(result: $number)
}job1 内使用的是常规用法在操作共享变量前用 lock() 加锁在 finally 代码块中解锁。job2 内使用的是简便写法withLock() 将代码块内的代码放入 try 中执行在 finally 中用 unlock() 解锁
OptIn(ExperimentalContracts::class)
public suspend inline fun T Mutex.withLock(owner: Any? null, action: () - T): T {contract {callsInPlace(action, InvocationKind.EXACTLY_ONCE)}lock(owner)return try {action()} finally {unlock(owner)}
}Mutex 的优势是性能但由于它是基于协程的因此只能在协程中使用。所以如果只在协程中使用共享资源那么就用 Mutex如果需要在线程中使用就要用上一节说的 synchronized 与 Lock。
3.3 Semaphore
Java 还有一个 Semaphore信号量一个可以被多个线程持有的锁。你可以在它的构造方法中指定它最多可以被几个线程持有如果有多余指定数量的线程去获取 Semaphore 就会陷入等待。获取锁用 acquire()释放锁用 release()。
由于共享变量是只要有两个线程同时访问就会导致出错了因此允许多个线程持有的 Semaphore 并不能用于解决竞态条件的问题它是用来做性能控制的。你可以用它来实现类似线程池的功能只不过你实现出来的是自己定制的对象池同一时间最多只有多少个对象同时在做事满了之后如果再来新对象就得等着直到有新的坑让出来这些新对象才能开始做事。
Kotlin 提供了一个 Semaphore 的协程版本就叫 Semaphore定位与 Java 的 Semaphore 相同只不过是协程版本。
3.4 其他 API
在传统的线程系统里还有一组典型的 APIwait()、notify()、notifyAll()。它们三个属于更底层的 API在线程系统里它既能实现互斥锁也能实现线程之间相互等待的功能。但事实上这些年已经基本没人再用这组函数了。因为 synchronized 关键字与 Lock 的推出已经基本上完全替代了它们而且它们用起来也很麻烦所以现在没人用。正因如此协程没有推出与它们类似的 API。
AtomicInteger 与 CopyOnWriteArrayList 等等也可以在协程中使用。虽然它们是针对线程的但是卡住线程的同时一定把协程也卡住了。所以在协程里也可以无风险地使用。
此外volatile 与 transient 也可以在协程中使用只不过不再是关键字而是注解。
4、ThreadLocal
ThreadLocal 是线程的局部变量即该变量在每个线程都是独立的从不同的线程中访问该变量这些线程对变量的值的读写都是相互独立的对每个线程都有独立的副本。
ThreadLocal 是用来干嘛的它的定位就像它的名字一样就是针对线程的局部变量。Java 变量按照作用域由小到大可以划分为局部变量方法内、成员变量类内、静态变量全局ThreadLocal 是一种介于局部变量和静态变量之间的一种变量范围比方法大比静态全局小只在当前线程范围内有效。
ThreadLocal 是对 Java 线程一个很关键的能力补充。前面提过协程相对线程的一大优势就是线程不具备结构化管理的能力而协程结构化管理的能力相当强大。线程不具备结构化管理的能力但我们开发时是有结构化管理的需求的这时就要用 ThreadLocal。有了 ThreadLocal 之后在同一个线程里执行的多个方法之间就可以共享变量了且该共享变量只针对当前线程有效跨线程时还是独立的。因此 ThreadLocal 通常会作为静态变量存在。
ThreadLocal 在协程中的等价物是什么有什么东西是跨方法的、针对协程的局部变量吗CoroutineContext 就是协程里的 ThreadLocal。
本来由于协程是具备结构化管理能力的你完全不需要在协程内使用 ThreadLocal。但是开发过程中免不了与 Java 代码进行协作如果想在协程代码里访问老代码里的 ThreadLocal 对象是不能像如下这样直接使用的
val kotlinLocalString ThreadLocalString()
fun main() runBlockingUnit {val scope CoroutineScope(EmptyCoroutineContext)val job scope.launch {kotlinLocalString.set(Test)delay(1000)println(kotlinLocalString.get())}job.join()
}kotlinLocalString 的 get() 拿到的值一定是 set() 设置的值吗不一定因为虽然协程没变但是执行协程代码的线程有可能改变了delay() 的时候线程被让出可能会去执行其他协程的代码。等 delay() 结束继续执行下面代码的时候有可能就不是在刚才的线程中执行了。因为协程只能保证在执行挂起函数之后依然运行在刚才的 ContinuationInterceptor 所管理的某一个线程池上不能保证同一个线程。
因此 ThreadLocal 不能在协程中直接使用因为它的效果在协程中变得不可靠了。怎么办用 asContextElement() 把 ThreadLocal 转换成 CoroutineContext
val kotlinLocalString ThreadLocalString()
fun main() runBlockingUnit {val scope CoroutineScope(EmptyCoroutineContext)val job scope.launch {val stringContext kotlinLocalString.asContextElement(Test)withContext(stringContext) {delay(1000)println(kotlinLocalString.get())}}job.join()
}asContextElement() 是 ThreadLocal 的扩展函数它会把参数里的值封装到返回值的 ThreadLocalElement 中。再将结果填到 withContext() 的参数中包住获取 ThreadLocal 值的代码这时候里面的 ThreadLocal 就是对协程兼容的了。不管里面怎么切协程只要没出协程它的值都会被保持住。