kotlin這個(gè)夏天java最有競(jìng)爭(zhēng)力的語言肛走。關(guān)于它的語法糖在這就不一一闡述了漓雅,畢竟它能甜死你。
先說說什么是協(xié)程吧,用戶態(tài)的子線程邻吞,輕量級(jí)组题,進(jìn)程->線程->協(xié)程。
進(jìn)程抱冷、線程崔列、協(xié)程的關(guān)系和區(qū)別:
進(jìn)程擁有自己獨(dú)立的堆和棧,既不共享堆旺遮,亦不共享?xiàng)U匝叮M(jìn)程由操作系統(tǒng)調(diào)度。
線程擁有自己獨(dú)立的棧和共享的堆耿眉,共享堆边翼,不共享?xiàng)#€程亦由操作系統(tǒng)調(diào)度(標(biāo)準(zhǔn)線程是的)鸣剪。
協(xié)程和線程一樣共享堆组底,不共享?xiàng)#瑓f(xié)程由程序員在協(xié)程的代碼里顯示調(diào)度筐骇。
協(xié)程的好處如下:
1.減少cpu線程上下文切換的開銷
2.降低了內(nèi)存消耗;
3.提高了cpu緩存命中率;
4.整體上提高了性能债鸡;
5.不提高硬件的前提下,提升了系統(tǒng)的負(fù)載能力拥褂。
只需要極少的棧內(nèi)存(大概是4~5KB)娘锁,默認(rèn)情況下,線程棧的大小為1MB饺鹃,一個(gè)線程可以開啟數(shù)十萬的協(xié)程莫秆,線程占用的內(nèi)存開銷遠(yuǎn)比協(xié)程要大得多。
golang原生就實(shí)現(xiàn)了協(xié)程悔详,由runtime自行管理镊屎,一個(gè)go關(guān)鍵字就能開啟goroutine。簡(jiǎn)直完美茄螃,但是今天要講的不是golang缝驳。
總之,協(xié)程就是便宜归苍,廉價(jià)用狱,高效的代名詞。
java里面要擁有這種高性能的協(xié)程拼弃,要通過第三方包來實(shí)現(xiàn)quasar夏伊,comsat,kilim
上面這三位吻氧,就是目前所有java里面能快速實(shí)現(xiàn)coroutines的jar溺忧。
quasar:通過織入java字節(jié)碼的方式咏连,改變字節(jié)碼結(jié)果,來使用鲁森,javaagent的方式
comsat:quasar的包裝版本祟滴,提供輕量化的包裝能快速使用。
kilim:和quasar一樣歌溉,也要織入字節(jié)碼來使用
但都有一個(gè)問題垄懂,必須預(yù)先給到注解,以上都能通過編譯研底,但是到了linux環(huán)境埠偿,需要通過javaagent,因字節(jié)碼被改寫榜晦,無法追蹤具體問題冠蒋。協(xié)程管理是個(gè)大問題,會(huì)被線程kill乾胶,無故消失抖剿,筆者通過大半個(gè)月的實(shí)驗(yàn),發(fā)現(xiàn)它們無法通過大部分環(huán)境识窿,因而放棄斩郎。
kotlin.corouties
kotlin.corouties真是個(gè)非常好的api。語法簡(jiǎn)化喻频,可以和golang的go關(guān)鍵字有得一拼缩宜。但在目前的kotlin api中是實(shí)驗(yàn)性質(zhì),不過已經(jīng)具備上生產(chǎn)環(huán)境的能力甥温,預(yù)計(jì)會(huì)在1.1.5中正式發(fā)布锻煌。因kotlin和java可以混編,所以coroutines是個(gè)下個(gè)高并發(fā)必備的知識(shí)點(diǎn)了姻蚓。
kotlin.corouties調(diào)度器
CommonPool 調(diào)度器默認(rèn)是通過fork/join的方式實(shí)現(xiàn)宋梧,目前還不提供接口,做自定義實(shí)現(xiàn)
launch(CommonPool)
Represents common pool of shared threads as coroutine dispatcher for compute-intensive tasks.
It uses[java.util.concurrent.ForkJoinPool]when available, which implements efficient work-stealing algorithm for its queues, so every coroutine resumption is dispatched as a separate task even when it already executes inside the pool.When available, it wraps ForkJoinPool.commonPool and provides a similar shared pool where not.
也就是說狰挡,kotlin的協(xié)程是并行調(diào)度的捂龄,關(guān)于fork/join也可以單獨(dú)開一章講了,暫不表加叁。
Unconfined 調(diào)度器倦沧,默認(rèn)是主線程調(diào)度 ,無限制啟動(dòng)協(xié)程它匕,一旦協(xié)程睡了或者掛了刀脏,會(huì)啟動(dòng)新的協(xié)程
launch(Unconfined)
A coroutine dispatcher that is not confined to any specific thread.
It executes initial continuation of the coroutine right here in the current call-frame
and let the coroutine resume in whatever thread that is used by the corresponding suspending function, without
mandating any specific threading policy.
Note, that if you need your coroutine to be confined to a particular thread or a thread-pool after resumption,
but still want to execute it in the current call-frame until its first suspension, then you can use
an optional [CoroutineStart] parameter in coroutine builders like [launch] and [async] setting it to the
the value of [CoroutineStart.UNDISPATCHED].
ThreadPoolDispatcher.newSingleThreadContext調(diào)度器,單個(gè)線程的調(diào)度器
launch(newSingleThreadContext("MyOwnThread"))
Creates new coroutine execution context with the a single thread and built-in [yield] and [delay] support.
All continuations are dispatched immediately when invoked inside the thread of this context.
Resources of this pool (its thread) are reclaimed when job of this context is cancelled.
The specified [name] defines the name of the new thread.
An optional [parent] job may be specified upon creation.
launch(newFixedThreadPoolContext(100,"MyOwnThread")) 調(diào)度器超凳,指定線程數(shù)量的調(diào)度器
Creates new coroutine execution context with the fixed-size thread-pool and built-in [yield] and [delay] support.
All continuations are dispatched immediately when invoked inside the threads of this context.
Resources of this pool (its threads) are reclaimed when job of this context is cancelled.
The specified [name] defines the names of the threads.
An optional [parent] job may be specified upon creation.
默認(rèn)請(qǐng)全部使用launch(CommonPool)愈污,有特殊的限制問題,再考慮其他的調(diào)度器
launch(CommonPool) 異步協(xié)程開啟
async(CommonPool) 同步協(xié)程開啟
官方示例的Hello,World!轮傍,歡迎進(jìn)入kotlin協(xié)程的世界
fun main(args: Array<String>) {
launch(CommonPool) { // create new coroutine in common thread pool
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello,") // main function continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
kotlin中的sleep將不能暫停協(xié)程了暂雹,是個(gè)大坑,后面會(huì)講到创夜。
launch 啟動(dòng)協(xié)程杭跪,默認(rèn)情況下直接開始執(zhí)行,也可以顯式執(zhí)行
var job= launch(CommonPool)
if(job.isActive){
job.cancel()
}else{
job.start()
}
job任務(wù)可以根據(jù)需要什么時(shí)候開始執(zhí)行驰吓,是否存活涧尿,取消等,提供了一系列api
有個(gè)小事檬贰,kotlin去掉了姑廉; 估計(jì)這個(gè)又可以引發(fā)一波大戰(zhàn)
CommonPool 調(diào)度器
delay將會(huì)暫停1秒?yún)f(xié)程運(yùn)行,
printlin是kotlin的打印方法翁涤,等同于System.out.printlin
Thread.sleep 這句只能暫停啟動(dòng)協(xié)程的線程
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // wait until child coroutine completes
}
runBlocking<Unit> 啟動(dòng)一個(gè)非阻塞并且無返回值的任務(wù)
job.join() 等待協(xié)程任務(wù)完成
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch(CommonPool) {
doWorld()
}
println("Hello,")
job.join()
}
// this is your first suspending function
suspend fun doWorld() {
delay(1000L)
println("World!")
}
這個(gè)講suspend 關(guān)鍵字桥言,為的是代碼分離,不然就只能在 launch(CommonPool){}內(nèi)部用delay來睡協(xié)程了葵礼,去掉了suspend是無法在其他方法調(diào)用delay睡協(xié)程了号阿,直接編譯錯(cuò)誤。
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = List(100_000) { // create a lot of coroutines and list their jobs
launch(CommonPool) {
delay(1000L)
print(".")
}
}
jobs.forEach { it.join() } // wait for all jobs to complete
}
這個(gè)例子比較搞鸳粉,啟動(dòng)100K的協(xié)程扔涧,如果你像作者一樣,2G內(nèi)存的渣機(jī)可能直接out-of-memory error届谈,像筆者這樣的8G大內(nèi)存枯夜,是沒有一點(diǎn)問題的。輕松愉快500ms執(zhí)行完畢疼约。
這個(gè)例子也是為了展示協(xié)程的輕量級(jí)和強(qiáng)悍卤档,線程別說100K,就算10K,你的CPU和內(nèi)存分分鐘炸了程剥,只能重啟劝枣。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch(CommonPool) {
var nextPrintTime = 0L
var i = 0
while (isActive) { // cancellable computation loop
val currentTime = System.currentTimeMillis()
if (currentTime >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime = currentTime + 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
delay(1300L) // delay a bit to see if it was cancelled....
println("main: Now I can quit.")
}
delay的例子太多,單獨(dú)講一個(gè)织鲸。啟動(dòng)了一個(gè)協(xié)程任務(wù)去計(jì)算當(dāng)前的時(shí)間舔腾,然后你會(huì)發(fā)現(xiàn)協(xié)程內(nèi)置了一個(gè)isActive屬性,這也是線程內(nèi)部唯三的三大內(nèi)置屬性之一搂擦。其他的兩個(gè)為context和coroutineContext稳诚,不過context已經(jīng)被放棄了,大概是作者覺得context瀑踢,詞不達(dá)意吧扳还,從這點(diǎn)也可以發(fā)現(xiàn)kotlin不會(huì)隨意的刪除api才避,而是通過重命名,重載的方式提供新的氨距。
isActive:如果協(xié)程處于存活或任務(wù)未完成桑逝,狀態(tài)就返回true,如果取消或已完成俏让,則返回false
例子的意思也很明顯告訴你如果任務(wù)在delay時(shí)間內(nèi)未被cancel則一直計(jì)算下去并打印三次I'm sleeping楞遏,然后任務(wù)被cancel,協(xié)程取消首昔。主線程輸出main: Now I can quit
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch(CommonPool) {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
run(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
delay(1300L) // delay a bit to ensure it was cancelled indeed
println("main: Now I can quit.")
}
這個(gè)例子講的不可取消寡喝, run(NonCancellable)+finally=絕對(duì)執(zhí)行的代碼
run(NonCancellable)協(xié)程內(nèi)部啟動(dòng)一個(gè)新的協(xié)程,并且不能取消勒奇,霸道總裁般的代碼
run...{}內(nèi)可以使用coroutineContext预鬓,跟上一級(jí)的協(xié)程塊代碼做交互。
fun main(args: Array<String>) = runBlocking<Unit> {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
repeat(1000) :迭代器,輸入要迭代的次數(shù):1000次
withTimeout(1300L) 時(shí)間1.3秒撬陵。
這里講這個(gè)wiathTimeout主要是為了控制協(xié)程的超時(shí)時(shí)間珊皿,避免協(xié)程,一直在活動(dòng)巨税。雖然便宜蟋定,不代表能讓任務(wù)一直執(zhí)行下去,到了超時(shí)的時(shí)間會(huì)直接拋出異常
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 29
}
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
三個(gè)例子
measureTimeMillis :返回代碼塊的執(zhí)行耗時(shí)草添,比起java驶兜,話就是少,就是這么屌
CoroutineStart:協(xié)程的執(zhí)行模式(async和launch都可以用)
LAZY
懶加載
DEFAULT 默認(rèn)的模式
默認(rèn) - 根據(jù)其上下文立即執(zhí)行远寸。
ATOMIC
根據(jù)其上下文原則(不可取消)計(jì)劃協(xié)調(diào)執(zhí)行抄淑。
跟[DEFAULT]類似,但協(xié)程在開始執(zhí)行前無法取消驰后。
UNDISPATCHED
未分派:暫不明白用途
println("The answer is ${one.await() + two.await()}")
kotlin執(zhí)行計(jì)算可在字符串中一起計(jì)算
.await實(shí)際拿到的是協(xié)程返回的值肆资,在例子中也就是13和29
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
return 20
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 20
}
// The result type of asyncSomethingUsefulOne is Deferred<Int>
fun asyncSomethingUsefulOne() = async(CommonPool) {
doSomethingUsefulOne()
}
// The result type of asyncSomethingUsefulTwo is Deferred<Int>
fun asyncSomethingUsefulTwo() = async(CommonPool) {
doSomethingUsefulTwo()
}
// note, that we don't have `runBlocking` to the right of `main` in this example
fun main(args: Array<String>) {
val time = measureTimeMillis {
// we can initiate async actions outside of a coroutine
val one = asyncSomethingUsefulOne()
val two = asyncSomethingUsefulTwo()
// but waiting for a result must involve either suspending or blocking.
// here we use `runBlocking { ... }` to block the main thread while waiting for the result
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}
runBlocking{}是個(gè)同步非阻塞的代碼塊執(zhí)行器,能統(tǒng)一拿到coroutines的返回值灶芝,支持泛型和接受返回參郑原,多個(gè)或單個(gè)協(xié)程一旦啟動(dòng)后我們要拿返回值不僅可以用await,也可以用runBlocking
var result= runBlocking<Int> {
var resultint = one.await() + two.await()
println("The answer is resultint="+resultint)
//基本類型直接這樣寫就可以
resultint
}
println(result)
============================================================================
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
介紹了多個(gè)調(diào)度器
launch(Unconfined)
launch(coroutineContext):這個(gè)調(diào)度器只有在runBlocking內(nèi)部才能用夜涕,嚴(yán)格來說不算調(diào)度器犯犁,內(nèi)部協(xié)程的下上文中,繼續(xù)啟動(dòng)協(xié)程
launch(CommonPool)
launch(newSingleThreadContext("MyOwnThread"))
具體解釋看開篇的說明
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
delay(500)
println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
這一句將會(huì)在新的協(xié)程中打印出來女器,因?yàn)閰f(xié)程本身被delay了
private val log = LoggerFactory.getLogger(X::class.java)
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking<Unit> {
val a = async(coroutineContext) {
log.info("I'm computing a piece of the answer")
log("I'm computing a piece of the answer")
6
}
val b = async(coroutineContext) {
log.info("I'm computing another piece of the answer")
log("I'm computing a piece of the answer")
7
}
log.info("The answer is ${a.await() * b.await()}")
}
這里要講的是日志:如果你是lombok的使用者酸役,那么很遺憾,lombox現(xiàn)在暫不支持在kotlin使用@Slf4j或者@Log4j
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
這一句是官方示例給的,最好別用
private val log = LoggerFactory.getLogger(X::class.java)
跟以前一樣用LoggerFactory拿就好了
fun main(args: Array<String>) = runBlocking<Unit> {
println("My job is ${coroutineContext.get(Job.Key)}")
println("My job is ${coroutineContext.get(Job)}")
println("My job is ${coroutineContext[Job]}")
}
runBlocking<Unit> 這個(gè)老伙計(jì)了涣澡,老伙計(jì)本身其實(shí)也是coroutines啟動(dòng)的沒想到吧贱呐,驚不驚喜,意不意外暑塑。這種設(shè)計(jì)就跟golang一樣吼句,有個(gè)統(tǒng)一的runtime管理器,但這里是顯式的事格。
它被設(shè)計(jì)出來最大的原因就是阻塞執(zhí)行了,在它內(nèi)部可以啟動(dòng)多個(gè)async協(xié)程搞隐,然后共同計(jì)算出一個(gè)復(fù)雜的對(duì)象驹愚,然后統(tǒng)一返回給runBlocking,外部就可以直接接收
maven配置
其實(shí)可以直接引用kotlinx-coroutines-core劣纲,不過它的依賴項(xiàng)會(huì)晚于官方的發(fā)布版本所以我們先排除它的依賴自己引用最新版的kotlin
kotlin-stdlib-jre8或者kotlin-stdlib-jre7
或者直接就用kotlin-stdlib都是可以的逢捺。
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
<version>1.1.3-2</version>
</dependency>
<!-- <dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jre8</artifactId>
<version>1.1.3-2</version>
</dependency>-->
<!-- < <dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jre7</artifactId>
<version>1.1.3-2</version>
</dependency>-->
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test</artifactId>
<version>1.1.3-2</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>kotlin-stdlib</artifactId>
<groupId>org.jetbrains.kotlin</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
<version>0.17</version>
<exclusions>
<exclusion>
<artifactId>kotlin-stdlib</artifactId>
<groupId>org.jetbrains.kotlin</groupId>
</exclusion>
</exclusions>
</dependency>
plugin
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<version>1.1.3-2</version>
<executions>
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile</id>
<phase>test-compile</phase>
<goals>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>testCompile</id>
<phase>test-compile</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
關(guān)于Thread.sleep,這個(gè)最好不要在使用了coroutines后繼續(xù)用了,
如果是你配合spring-retry這種線程sleep的框架更要注意癞季,高并發(fā)的情況下如果線程sleep劫瞳,可能會(huì)導(dǎo)致線程無法喚醒,整個(gè)應(yīng)用處理不了請(qǐng)求
今天就聊到這绷柒,delay一下志于。歡迎留言交流關(guān)于kotlin.coroutines的問題
kotlinx.coroutines
下一章,應(yīng)該是下周
轉(zhuǎn)載請(qǐng)聯(lián)系我本人授權(quán)