由于工作原因呛牲,最近碰上一個大數(shù)據(jù)量的操作(主要的操作是生成上億條測試數(shù)據(jù)刮萌,提交到索引服務(wù)器上),程序慢到不可接受娘扩。于是想辦法讓程序快起來着茸。
說到Java的多線程編程壮锻,首先想到繼承Thread,重寫run方法涮阔,新建一個Thread對象猜绣,調(diào)用start方法。
線程
package com.github.yfor.bigdata.tdg;
/**
* Created by wq on 2017/5/14.
*/
public class TestThread extends Thread {
public TestThread() {
}
public void run() {
}
public static void main(String[] args) {
TestThread tt= new TestThread();
tt.start();
//一些操作
}
}
這種方式敬特,已經(jīng)是并發(fā)了掰邢,對于線程的生命周期,join 伟阔,yield等方法這里就不講了辣之,還是找本書去看比較靠譜。很多同志多線程的學習到這里可能就結(jié)束了减俏。
最原始的方式有一些問題召烂,竟然很多人在生產(chǎn)環(huán)境使用,還一臉的自豪感(我也是這樣過)娃承。要新建多少線程?管他呢奏夫。 如果一直創(chuàng)建線程會大量搶占資源。當然有點經(jīng)驗的同志會建議使用線程池历筝。
線程池的用法酗昼。
// Executors.newCachedThreadPool(); //創(chuàng)建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE
// Executors.newSingleThreadExecutor(); //創(chuàng)建容量為1的緩沖池
// Executors.newFixedThreadPool(int); //創(chuàng)建固定容量大小的緩沖池
class MyTask implements Runnable {
public MyTask() {
}
@Override
public void run() {
//do something
}
}
ExecutorService executor = Executors.newFixedThreadPool(5)
MyTask myTask = new MyTask();
executor.execute(myTask);
對于單次提交數(shù)據(jù)的數(shù)量梳猪,當然單次數(shù)量越少越快麻削,但是次數(shù)會變多,總體時間會變長春弥,單次提交過多呛哟,執(zhí)行會非常慢,以至于可能會失敗匿沛,經(jīng)過多次測試數(shù)據(jù)量在幾千到一萬時是比較能夠接受的扫责。
選擇那種線程池呢,是固定大小的逃呼,還是無限增長的鳖孤。當線程數(shù)量超過限制時會如何呢?這幾種線程池都會拋出異常抡笼。
有一定經(jīng)驗的同志會不屑的說阻塞的線程池苏揣,基本就比較靠譜,例如加上等待隊列推姻,等待隊列用一個阻塞的隊列平匈。小的缺點是一直創(chuàng)建線程,感覺也不是非常的合理。
帶隊列的線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue(5));
由于前幾天正好看到多線程的書籍吐葱,突然想起了街望,我的程序明顯可以使用生產(chǎn)者消費者模式,于是對程序進行了改進弟跑。
主程序
package com.github.yfor.bigdata.tdg;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created by wq on 2017/5/14.
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
int threadNum = Runtime.getRuntime().availableProcessors() * 2;
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(100);
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < threadNum; i++) {
executor.execute(new Consumerlocal(queue));
}
Thread pt = new Thread(new Producerlocal(queue));
pt.start();
pt.join();
for (int i = 0; i < threadNum; i++) {
queue.put("poisonpill");
}
executor.shutdown();
executor.awaitTermination(10L, TimeUnit.DAYS);
}
}
生產(chǎn)者
package com.github.yfor.bigdata.tdg;
import java.util.concurrent.ArrayBlockingQueue;
/**
* Created by wq on 2017/5/14.
*/
public class Producerlocal implements Runnable {
ArrayBlockingQueue<String> queue;
public Producerlocal(ArrayBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 1000; i++) {
queue.put("s" + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消費者
package com.github.yfor.bigdata.tdg;
import java.util.concurrent.ArrayBlockingQueue;
/**
* Created by wq on 2017/5/14.
*/
public class Consumerlocal implements Runnable {
ArrayBlockingQueue<String> queue;
public Consumerlocal(ArrayBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
final String take = queue.take();
if ("poisonpill".equals(take)) {
return;
}
//do something
System.out.println(take);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
程序使用了阻塞隊列灾前,隊列設(shè)置一定的大小,加入隊列超過數(shù)量會阻塞孟辑,隊列空了取值也會阻塞哎甲,感興趣的同學可以查看jdk源碼。消費者線程數(shù)是CPU的兩倍饲嗽,對于這些類的使用需要查看手冊和寫測試代碼炭玫。對于何時結(jié)束線程也有一定的小技巧,加入足夠數(shù)量的毒丸貌虾。
對于代碼使用了新的模式吞加,程序明顯加快了,到這里生產(chǎn)者消費者模式基本就結(jié)束了尽狠。如果你下次想起你的程序也需要多線程衔憨,正好適合這種模式,那么套用進來就是很好的選擇袄膏。當然你現(xiàn)在能做的就是擼起袖子践图,寫一些測試代碼,找到這種模式的感覺沉馆。
因為程序的大多數(shù)時間還是在http請求上码党,程序的運行時間仍然不能夠接受。于是想到了利用異步io加快速度斥黑,而不用阻塞的http揖盘。但是問題是這次的http客戶端為了安全驗證進行了修改,有加密驗證和單點登錄锌奴,新的客戶端能適配起來有一定難度估計需要一定的時間兽狭,還是怕搞不定。異步的非阻塞io缨叫,對于前面數(shù)據(jù)結(jié)果選擇的經(jīng)驗椭符,非阻塞不一定就是好!其實是沒太看懂怎么在多線程中使用荔燎,而對于所得到的效果就不得而知了耻姥。
maven依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.3</version>
</dependency>
異步http
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package com.github.yfor.bigdata.tdg;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpPipeliningClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.client.methods.AsyncCharConsumer;
import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
import org.apache.http.protocol.HttpContext;
import java.io.IOException;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
/**
* This example demonstrates a pipelinfed execution of multiple HTTP request / response exchanges
* with a full content streaming.
*/
public class MainPhttpasyncclient {
public static void main(final String[] args) throws Exception {
CloseableHttpPipeliningClient httpclient = HttpAsyncClients.createPipelining();
try {
httpclient.start();
HttpHost targetHost = new HttpHost("httpbin.org", 80);
HttpGet[] resquests = {
new HttpGet("/"),
new HttpGet("/ip"),
new HttpGet("/headers"),
new HttpGet("/get")
};
List<MyRequestProducer> requestProducers = new ArrayList<MyRequestProducer>();
List<MyResponseConsumer> responseConsumers = new ArrayList<MyResponseConsumer>();
for (HttpGet request : resquests) {
requestProducers.add(new MyRequestProducer(targetHost, request));
responseConsumers.add(new MyResponseConsumer(request));
}
Future<List<Boolean>> future = httpclient.execute(
targetHost, requestProducers, responseConsumers, null);
future.get();
System.out.println("Shutting down");
} finally {
httpclient.close();
}
System.out.println("Done");
}
static class MyRequestProducer extends BasicAsyncRequestProducer {
private final HttpRequest request;
MyRequestProducer(final HttpHost target, final HttpRequest request) {
super(target, request);
this.request = request;
}
@Override
public void requestCompleted(final HttpContext context) {
super.requestCompleted(context);
System.out.println();
System.out.println("Request sent: " + this.request.getRequestLine());
System.out.println("=================================================");
}
}
static class MyResponseConsumer extends AsyncCharConsumer<Boolean> {
private final HttpRequest request;
MyResponseConsumer(final HttpRequest request) {
this.request = request;
}
@Override
protected void onResponseReceived(final HttpResponse response) {
System.out.println();
System.out.println("Response received: " + response.getStatusLine() + " -> " + this.request.getRequestLine());
System.out.println("=================================================");
}
@Override
protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException {
while (buf.hasRemaining()) {
buf.get();
}
}
@Override
protected void releaseResources() {
}
@Override
protected Boolean buildResult(final HttpContext context) {
System.out.println();
System.out.println("=================================================");
System.out.println();
return Boolean.TRUE;
}
}
}
雖然可以放在服務(wù)器上進行運行,減少io有咨,發(fā)揮16核CPU的威力(* 多消費者線程 *)琐簇,但還是不理想。
后來找同事請教,我的程序還是單機版婉商,而我們接收服務(wù)器是集群的形式似忧,程序改成了從zookeeper查詢服務(wù)器集群多個節(jié)點的IP進行發(fā)送請求(* 多消費者接收節(jié)點 *),而且在請求參數(shù)中加大提交時間丈秩,進行軟提交盯捌,而不是實時的commit,程序至此明顯加快蘑秽,達到了可以接受范圍饺著。
我們還有一版的程序是利用kafak,這個分布式隊列肠牲,將我們的生產(chǎn)者消費者進行到底幼衰。
配置
package com.github.yfor.bigdata.tdg;
/**
* Created by wq on 2017/4/29.
*/
public interface KafkaProperties {
final static String zkConnect = "localhost:2181";
final static String groupId = "group21";
final static String topic = "topic4";
final static String kafkaServerURL = "localhost";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 20000;
final static int reconnectInterval = 10000;
final static String clientId = "SimpleConsumerDemoClient";
}
kafka的配置需要一定的時間,可以閱讀官方文檔進行安裝并運行缀雳。
生產(chǎn)者線程
package com.github.yfor.bigdata.tdg;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
private final int size;
public Producer(String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<Integer, String>(props);
this.topic = topic;
this.isAsync = true;
this.size = producer.partitionsFor(topic).size();
}
@Override
public void run() {
int messageNo = 1;
while (messageNo < 100) {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously 異步
producer.send(new ProducerRecord<>(topic, messageNo % size, messageNo, messageStr),
new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously 同步
try {
producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}
消費者線程
package com.github.yfor.bigdata.tdg;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author leicui bourne_cui@163.com
*/
public class KafkaConsumer extends Thread {
private final ConsumerConnector consumer;
private final String topic;
private final int size;
public KafkaConsumer(String topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig());
this.topic = topic;
this.size = 5;
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
@Override
public void run() {
try {
sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(size));
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
ExecutorService executor = Executors.newFixedThreadPool(size);
for (final KafkaStream stream : streams) {
executor.submit(new KafkaConsumerThread(stream));
}
}
}
class KafkaConsumerThread implements Runnable {
private KafkaStream<byte[], byte[]> stream;
public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {
this.stream = stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> mam = it.next();
System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "],"
+ "offset[" + mam.offset() + "], " + new String(mam.message()));
}
}
}
這里的示例代碼借鑒了幾篇博客中的寫法渡嚣,能夠?qū)崿F(xiàn)多個消費者的處理。
生產(chǎn)者客戶端
package com.github.yfor.bigdata.tdg;
public class MainP {
public static void main(String[] args) {
Producer producerThread = new Producer(KafkaProperties.topic);
producerThread.start();
}
}
消費者客戶端
package com.github.yfor.bigdata.tdg;
public class MainC {
public static void main(String[] args) {
KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
consumerThread.start();
}
}
示例代碼為了容易理解肥印,減少了代碼量识椰,去除了相關(guān)的業(yè)務(wù)邏輯,希望大家能看到通用的模式設(shè)計竖独,而不是看到玩具的模式裤唠。建議大家進行測試執(zhí)行實驗,把分布式消息隊列加入到自己的工具箱莹痢,對求職的競爭力也有一定的提升种蘸。
如果不是很理解,也不用擔心竞膳,因為對于其中的部分細節(jié)我也沒有很好的掌握航瞭,經(jīng)過應(yīng)用才感覺能有一點體會。
參考資料
- 七周七并發(fā)模型
- solr參考手冊
- http異步客戶端文檔
- kafak官網(wǎng)文檔
- 網(wǎng)上的文章