假設(shè)服務(wù)器端提供無限的服務(wù)能力先紫,客戶端如何達(dá)到最高的QPS骗绕?如果使用多線程來提高并發(fā),線程數(shù)量會成為瓶頸贮泞。是否可以通過協(xié)程來提高客戶端的并發(fā)能力楞慈?下面做了一組測試,分別使用同步啃擦、異步囊蓝、協(xié)程+同步等方式請求百度的首頁,運(yùn)行10s令蛉,對比一下QPS聚霜。
測試環(huán)境
2017年MBP,2核4線程言询,16GB內(nèi)存俯萎。
Maven依賴
<properties>
<kotlin.version>1.3.11</kotlin.version>
</properties>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
<version>${kotlin.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test</artifactId>
<version>${kotlin.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>khttp</groupId>
<artifactId>khttp</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>io.github.rybalkinsd</groupId>
<artifactId>kohttp</artifactId>
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.12.1</version>
</dependency>
Kotlin多線程同步
使用Kotlin的khttp發(fā)起同步請求。
import org.junit.Test
import java.util.concurrent.atomic.AtomicInteger
public class TestHttpSyncClient {
val api = "https://www.baidu.com"
val completedCount = AtomicInteger(0)
var failedCount = AtomicInteger(0)
@Test
fun test() {
for (i in 0..80) {
Thread() {
while (true) {
try {
var response = khttp.get(api)
if (response.statusCode == 200) {
completedCount.incrementAndGet()
} else {
failedCount.incrementAndGet()
}
}catch (e : Exception) {
failedCount.incrementAndGet()
}
}
}.start()
}
Thread.sleep(10 * 1000)
println("completedCount: ${completedCount}, failedCount: ${failedCount}");
System.exit(0);
}
}
線程開到80個(gè)运杭,QPS可達(dá)2600左右夫啊。
completedCount: 26089, failedCount: 111
HttpAsyncClients異步
使用HttpAsyncClients發(fā)起異步請求。
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
public class TestHttpAsyncClient implements Runnable {
public final static AtomicInteger completedCount = new AtomicInteger(0);
public final static AtomicInteger failedCount = new AtomicInteger(0);
public final static AtomicInteger canceledCount = new AtomicInteger(0);
@Test
public void test() {
//啟動線程
TestHttpAsyncClient testHttpAsyncClient = new TestHttpAsyncClient();
new Thread(testHttpAsyncClient).start();
//測試10s
try {
Thread.sleep(10 * 1000);
System.out.println(String.format("completedCount: %d, failedCount: %d, canceledCount: %d",
completedCount.get(), failedCount.get(), canceledCount.get()));
System.exit(0);
} catch (Exception e) {
System.err.println(String.format("System.exit exception: %s", e));
}
}
public void run() {
CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom()
.setMaxConnTotal(128)
.setMaxConnPerRoute(128)
.build();
httpclient.start();
final HttpGet request = new HttpGet("https://www.baidu.com");
while (true) {
httpclient.execute(request, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse httpResponse) {
if (httpResponse.getStatusLine().getStatusCode() == 200) {
completedCount.incrementAndGet();
}else {
failedCount.incrementAndGet();
}
}
@Override
public void failed(Exception e) {
failedCount.incrementAndGet();
}
@Override
public void cancelled() {
canceledCount.incrementAndGet();
}
});
}
}
}
并發(fā)設(shè)置為128辆憔,QPS可以達(dá)到2000左右撇眯。
completedCount: 19319, failedCount: 125, canceledCount: 0
通過調(diào)試可以發(fā)現(xiàn)报嵌,只需要4個(gè)dispatcher線程,就可以滿足需要熊榛。
Kotlin協(xié)程+khttp
使用Kotlin Coroutine+khttp同步請求測試锚国。
import khttp.responses.Response
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.launch
import java.util.concurrent.atomic.AtomicInteger
import org.junit.Test
public class TestKotlinCoroutine {
val api = "https://www.baidu.com"
val completedCount = AtomicInteger(0)
val failedCount = AtomicInteger(0)
@Test
fun test() {
Thread(){
while (true) {
val resp =
GlobalScope.launch {
val result: Deferred<Response> = async {
get()
}
result.await()
}
}
}.start()
Thread.sleep(10 * 1000)
println("completedCount: ${completedCount}, failedCount: ${failedCount}");
System.exit(0);
}
suspend fun get(): khttp.responses.Response {
var response = khttp.get(api)
if (response.statusCode == 200) {
completedCount.addAndGet(1)
}else {
failedCount.addAndGet(1)
}
return response;
}
}
協(xié)程版本的性能甚至不如同步版本,QPS只有50玄坦。換一種方式血筑,起多個(gè)協(xié)程,每個(gè)線程里面循環(huán)請求煎楣,QPS也一樣只有50左右豺总。
completedCount: 498, failedCount: 0
這個(gè)版本性能很差,通過調(diào)試可以發(fā)現(xiàn)择懂,最多也就啟動了4個(gè)worker線程喻喳,并且khttp又不支持協(xié)程阻塞。
下面是一個(gè)優(yōu)化的版本困曙,將工作放到Dispatchers.IO
里面去做表伦,盡量多起一些worker線程。
public class TestKotlinCoroutineClient {
val api = "https://www.baidu.com"
val completedCount = AtomicInteger(0)
var failedCount = AtomicInteger(0)
val time = 10000
@Test
fun test() = runBlocking {
val start = System.currentTimeMillis()
val channel = Channel<Int>()
repeat(time) {
launch { channel.send(get()) }
}
repeat(time) {
val code = channel.receive()
if (code == 200) {
completedCount.incrementAndGet()
} else {
failedCount.incrementAndGet()
}
}
val end = System.currentTimeMillis()
println("completedCount: ${completedCount}, failedCount: ${failedCount}, cost${(end - start) / 1000}");
System.exit(0);
}
suspend fun get() = withContext(Dispatchers.IO) { khttp.get(api) }.statusCode
}
性能可以提升到1200左右慷丽。調(diào)試可以發(fā)現(xiàn)線程數(shù)量最多可達(dá)68個(gè)蹦哼。
Kotlin協(xié)程+kohttp
kohttp看起來支持協(xié)程。使用kohttp請求一萬次盈魁,花費(fèi)時(shí)間100s左右翔怎,QPS為100窃诉。
import io.github.rybalkinsd.kohttp.ext.asyncHttpGet
import junit.framework.Assert.assertEquals
import kotlinx.coroutines.runBlocking
import org.junit.Test
import kotlin.system.measureTimeMillis
public class TestKotlinCoroutine {
val api = "https://www.baidu.com"
@Test
fun `many async invokes of httpGet`() {
measureTimeMillis {
GlobalScope.launch {
val tasks = List(10000) {
api.asyncHttpGet()
}
tasks.map { r ->
r.await().also { it.close() }
}.forEach {
assertEquals(200, it.code())
}
}
}.also { println("$it ms") }
}
}
分析一下kohttp的實(shí)現(xiàn)杨耙,可以發(fā)現(xiàn)它只是封裝了okhttp3,使用了異步模式飘痛。okhttp3本身不支持協(xié)程珊膜,所以性能也不會太好。
fun String.asyncHttpGet(client: Call.Factory = defaultHttpClient): Deferred<Response> =
GlobalScope.async(context = Unconfined) {
client.suspendCall(Request.Builder().url(this@asyncHttpGet).build())
}
internal suspend fun Call.Factory.suspendCall(request: Request): Response =
suspendCoroutine { cont ->
newCall(request).enqueue(object : Callback {
override fun onResponse(call: Call, response: Response) {
cont.resume(response)
}
override fun onFailure(call: Call, e: IOException) {
cont.resumeWithException(e)
}
})
}
Go協(xié)程
順便試一下Go協(xié)程的性能宣脉。Go協(xié)程版本在使用keepalive的情況下车柠,256個(gè)協(xié)程的QPS可以達(dá)到1700左右。注意需要讀一下resp.Body塑猖,這樣keepalive才會生效竹祷。參考文章:A brief intro of TCP keep-alive in Go’s HTTP implementation。
package main
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"runtime"
"sync"
"sync/atomic"
"time"
)
var completedCount uint64
var failedCount uint64
var lock = &sync.Mutex{}
func main() {
runtime.GOMAXPROCS(4)
for i := 0; i < 256; i++ {
clt := newQPSClient()
go clt.test()
}
time.Sleep(10 * time.Second)
fmt.Printf("completedCount: %d, failedCount: %d\n", completedCount, failedCount)
os.Exit(0)
}
type QPSClient struct {
clt *http.Client
}
func newQPSClient() *QPSClient {
return &QPSClient{
clt: &http.Client{},
}
}
func (qc *QPSClient) test() {
for {
resp, err := qc.clt.Get("https://www.baidu.com")
if err == nil && (resp.StatusCode == 200) {
_, err = io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
atomic.AddUint64(&completedCount, 1)
} else {
_, err = io.Copy(ioutil.Discard, resp.Body)
atomic.AddUint64(&failedCount, 1)
}
}
}
結(jié)論
請求方式 | QPS |
---|---|
Kotlin多線程同步 | 80個(gè)線程羊苟,QPS 2600 |
HttpAsyncClients異步 | 最大128個(gè)連接塑陵,QPS 2000 |
Kotlin協(xié)程+khttp | 循環(huán)起協(xié)程+khttp同步,QPS 50 |
Kotlin協(xié)程+kohttp | 循環(huán)起協(xié)程+okhttp3異步蜡励,QPS 100 |
Go協(xié)程 | 256個(gè)協(xié)程令花,QPS 1700 |
上面這些測試阻桅,缺乏更精細(xì)的設(shè)置,比如HttpAsyncClients 128個(gè)連接使用了多少個(gè)線程兼都?Kotlin和Go的協(xié)程版本使用了多少線程嫂沉?還有沒有提升空間?
Coroutine運(yùn)行有很多受限條件扮碧,不能堵塞在操作系統(tǒng)會堵塞線程的地方趟章,需要自己實(shí)現(xiàn)對這些堵塞API的處理,在用戶態(tài)做上下文的保存和恢復(fù)慎王。Java生態(tài)圈中缺乏對協(xié)程支持良好的基礎(chǔ)庫尤揣,導(dǎo)致不能發(fā)揮協(xié)程真正的威力,使用異步IO是更好的解決辦法柬祠。