并行入門之生產(chǎn)者消費者Java實現(xiàn)

圖片發(fā)自簡書App

由于工作原因呛牲,最近碰上一個大數(shù)據(jù)量的操作(主要的操作是生成上億條測試數(shù)據(jù)刮萌,提交到索引服務(wù)器上),程序慢到不可接受娘扩。于是想辦法讓程序快起來着茸。

說到Java的多線程編程壮锻,首先想到繼承Thread,重寫run方法涮阔,新建一個Thread對象猜绣,調(diào)用start方法。

線程


package com.github.yfor.bigdata.tdg;

/**
 * Created by wq on 2017/5/14.
 */
public class TestThread extends Thread {
    public TestThread() {
    }

    public void run() {

    }
    public static void main(String[] args) {
        TestThread tt= new TestThread();
        tt.start();
        //一些操作
    }
}




這種方式敬特,已經(jīng)是并發(fā)了掰邢,對于線程的生命周期,join 伟阔,yield等方法這里就不講了辣之,還是找本書去看比較靠譜。很多同志多線程的學習到這里可能就結(jié)束了减俏。

最原始的方式有一些問題召烂,竟然很多人在生產(chǎn)環(huán)境使用,還一臉的自豪感(我也是這樣過)娃承。要新建多少線程?管他呢奏夫。 如果一直創(chuàng)建線程會大量搶占資源。當然有點經(jīng)驗的同志會建議使用線程池历筝。

線程池的用法酗昼。

// Executors.newCachedThreadPool();        //創(chuàng)建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE
// Executors.newSingleThreadExecutor();   //創(chuàng)建容量為1的緩沖池
// Executors.newFixedThreadPool(int);    //創(chuàng)建固定容量大小的緩沖池


class MyTask implements Runnable {
   
     
    public MyTask() {
    }
     
    @Override
    public void run() {
  //do something
  }
}

   ExecutorService executor =  Executors.newFixedThreadPool(5)
   MyTask myTask = new MyTask();
   executor.execute(myTask);

對于單次提交數(shù)據(jù)的數(shù)量梳猪,當然單次數(shù)量越少越快麻削,但是次數(shù)會變多,總體時間會變長春弥,單次提交過多呛哟,執(zhí)行會非常慢,以至于可能會失敗匿沛,經(jīng)過多次測試數(shù)據(jù)量在幾千到一萬時是比較能夠接受的扫责。

選擇那種線程池呢,是固定大小的逃呼,還是無限增長的鳖孤。當線程數(shù)量超過限制時會如何呢?這幾種線程池都會拋出異常抡笼。

有一定經(jīng)驗的同志會不屑的說阻塞的線程池苏揣,基本就比較靠譜,例如加上等待隊列推姻,等待隊列用一個阻塞的隊列平匈。小的缺點是一直創(chuàng)建線程,感覺也不是非常的合理。

帶隊列的線程池

ThreadPoolExecutor  executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                 new ArrayBlockingQueue(5));

由于前幾天正好看到多線程的書籍吐葱,突然想起了街望,我的程序明顯可以使用生產(chǎn)者消費者模式,于是對程序進行了改進弟跑。


圖片發(fā)自簡書App

主程序



package com.github.yfor.bigdata.tdg;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Created by wq on 2017/5/14.
 */
public class Main {

    public static void main(String[] args) throws InterruptedException {
        int threadNum = Runtime.getRuntime().availableProcessors() * 2;
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(100);
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < threadNum; i++) {
            executor.execute(new Consumerlocal(queue));
        }
        Thread pt = new Thread(new Producerlocal(queue));
        pt.start();
        pt.join();
        for (int i = 0; i < threadNum; i++) {
            queue.put("poisonpill");
        }

        executor.shutdown();
        executor.awaitTermination(10L, TimeUnit.DAYS);
    }
}

生產(chǎn)者


package com.github.yfor.bigdata.tdg;

import java.util.concurrent.ArrayBlockingQueue;

/**
 * Created by wq on 2017/5/14.
 */
public class Producerlocal implements Runnable {


    ArrayBlockingQueue<String> queue;

    public Producerlocal(ArrayBlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        try {
            for (int i = 0; i < 1000; i++) {
                queue.put("s" + i);
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消費者

package com.github.yfor.bigdata.tdg;

import java.util.concurrent.ArrayBlockingQueue;

/**
 * Created by wq on 2017/5/14.
 */
public class Consumerlocal implements Runnable {

    ArrayBlockingQueue<String> queue;

    public Consumerlocal(ArrayBlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                final String take = queue.take();

                if ("poisonpill".equals(take)) {
                    return;
                }
                //do something
                System.out.println(take);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

程序使用了阻塞隊列灾前,隊列設(shè)置一定的大小,加入隊列超過數(shù)量會阻塞孟辑,隊列空了取值也會阻塞哎甲,感興趣的同學可以查看jdk源碼。消費者線程數(shù)是CPU的兩倍饲嗽,對于這些類的使用需要查看手冊和寫測試代碼炭玫。對于何時結(jié)束線程也有一定的小技巧,加入足夠數(shù)量的毒丸貌虾。

對于代碼使用了新的模式吞加,程序明顯加快了,到這里生產(chǎn)者消費者模式基本就結(jié)束了尽狠。如果你下次想起你的程序也需要多線程衔憨,正好適合這種模式,那么套用進來就是很好的選擇袄膏。當然你現(xiàn)在能做的就是擼起袖子践图,寫一些測試代碼,找到這種模式的感覺沉馆。

因為程序的大多數(shù)時間還是在http請求上码党,程序的運行時間仍然不能夠接受。于是想到了利用異步io加快速度斥黑,而不用阻塞的http揖盘。但是問題是這次的http客戶端為了安全驗證進行了修改,有加密驗證和單點登錄锌奴,新的客戶端能適配起來有一定難度估計需要一定的時間兽狭,還是怕搞不定。異步的非阻塞io缨叫,對于前面數(shù)據(jù)結(jié)果選擇的經(jīng)驗椭符,非阻塞不一定就是好!其實是沒太看懂怎么在多線程中使用荔燎,而對于所得到的效果就不得而知了耻姥。

maven依賴

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpasyncclient</artifactId>
            <version>4.1.3</version>
        </dependency>

異步http

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package com.github.yfor.bigdata.tdg;

import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpPipeliningClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.client.methods.AsyncCharConsumer;
import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
import org.apache.http.protocol.HttpContext;

import java.io.IOException;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

/**
 * This example demonstrates a pipelinfed execution of multiple HTTP request / response exchanges
 * with a full content streaming.
 */
public class MainPhttpasyncclient {

    public static void main(final String[] args) throws Exception {
        CloseableHttpPipeliningClient httpclient = HttpAsyncClients.createPipelining();
        try {
            httpclient.start();

            HttpHost targetHost = new HttpHost("httpbin.org", 80);
            HttpGet[] resquests = {
                    new HttpGet("/"),
                    new HttpGet("/ip"),
                    new HttpGet("/headers"),
                    new HttpGet("/get")
            };

            List<MyRequestProducer> requestProducers = new ArrayList<MyRequestProducer>();
            List<MyResponseConsumer> responseConsumers = new ArrayList<MyResponseConsumer>();
            for (HttpGet request : resquests) {
                requestProducers.add(new MyRequestProducer(targetHost, request));
                responseConsumers.add(new MyResponseConsumer(request));
            }

            Future<List<Boolean>> future = httpclient.execute(
                    targetHost, requestProducers, responseConsumers, null);
            future.get();
            System.out.println("Shutting down");
        } finally {
            httpclient.close();
        }
        System.out.println("Done");
    }

    static class MyRequestProducer extends BasicAsyncRequestProducer {

        private final HttpRequest request;

        MyRequestProducer(final HttpHost target, final HttpRequest request) {
            super(target, request);
            this.request = request;
        }

        @Override
        public void requestCompleted(final HttpContext context) {
            super.requestCompleted(context);
            System.out.println();
            System.out.println("Request sent: " + this.request.getRequestLine());
            System.out.println("=================================================");
        }
    }

    static class MyResponseConsumer extends AsyncCharConsumer<Boolean> {

        private final HttpRequest request;

        MyResponseConsumer(final HttpRequest request) {
            this.request = request;
        }

        @Override
        protected void onResponseReceived(final HttpResponse response) {
            System.out.println();
            System.out.println("Response received: " + response.getStatusLine() + " -> " + this.request.getRequestLine());
            System.out.println("=================================================");
        }

        @Override
        protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException {
            while (buf.hasRemaining()) {
                buf.get();
            }
        }

        @Override
        protected void releaseResources() {
        }

        @Override
        protected Boolean buildResult(final HttpContext context) {
            System.out.println();
            System.out.println("=================================================");
            System.out.println();
            return Boolean.TRUE;
        }

    }

}

雖然可以放在服務(wù)器上進行運行,減少io有咨,發(fā)揮16核CPU的威力(* 多消費者線程 *)琐簇,但還是不理想。

后來找同事請教,我的程序還是單機版婉商,而我們接收服務(wù)器是集群的形式似忧,程序改成了從zookeeper查詢服務(wù)器集群多個節(jié)點的IP進行發(fā)送請求(* 多消費者接收節(jié)點 *),而且在請求參數(shù)中加大提交時間丈秩,進行軟提交盯捌,而不是實時的commit,程序至此明顯加快蘑秽,達到了可以接受范圍饺著。

我們還有一版的程序是利用kafak,這個分布式隊列肠牲,將我們的生產(chǎn)者消費者進行到底幼衰。

配置


package com.github.yfor.bigdata.tdg;

/**
 * Created by wq on 2017/4/29.
 */
public interface KafkaProperties {
    final static String zkConnect = "localhost:2181";
    final static String groupId = "group21";
    final static String topic = "topic4";
    final static String kafkaServerURL = "localhost";
    final static int kafkaServerPort = 9092;
    final static int kafkaProducerBufferSize = 64 * 1024;
    final static int connectionTimeOut = 20000;
    final static int reconnectInterval = 10000;

    final static String clientId = "SimpleConsumerDemoClient";
}

kafka的配置需要一定的時間,可以閱讀官方文檔進行安裝并運行缀雳。

生產(chǎn)者線程

package com.github.yfor.bigdata.tdg;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Producer extends Thread {
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;
    private final int size;

    public Producer(String topic) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<Integer, String>(props);
        this.topic = topic;
        this.isAsync = true;
        this.size = producer.partitionsFor(topic).size();

    }

    @Override
    public void run() {
        int messageNo = 1;
        while (messageNo < 100) {
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously 異步
                producer.send(new ProducerRecord<>(topic, messageNo % size, messageNo, messageStr),
                        new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously 同步
                try {
                    producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            ++messageNo;
        }

    }

}

class DemoCallBack implements Callback {

    private final long startTime;
    private final int key;
    private final String message;

    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                    "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                            "), " +
                            "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }

    }


}

消費者線程


package com.github.yfor.bigdata.tdg;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author leicui bourne_cui@163.com
 */
public class KafkaConsumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;
    private final int size;

    public KafkaConsumer(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        this.topic = topic;
        this.size = 5;
    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id", KafkaProperties.groupId);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    @Override
    public void run() {
        try {
            sleep(2000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(size));

        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        ExecutorService executor = Executors.newFixedThreadPool(size);
        for (final KafkaStream stream : streams) {
            executor.submit(new KafkaConsumerThread(stream));
        }

    }
}

class KafkaConsumerThread implements Runnable {

    private KafkaStream<byte[], byte[]> stream;

    public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {
        this.stream = stream;
    }


    public void run() {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> mam = it.next();
            System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "],"
                    + "offset[" + mam.offset() + "], " + new String(mam.message()));

        }
    }

}

這里的示例代碼借鑒了幾篇博客中的寫法渡嚣,能夠?qū)崿F(xiàn)多個消費者的處理。

生產(chǎn)者客戶端

package com.github.yfor.bigdata.tdg;

public class MainP {

    public static void main(String[] args) {
        Producer producerThread = new Producer(KafkaProperties.topic);
        producerThread.start();


    }
}

消費者客戶端

package com.github.yfor.bigdata.tdg;

public class MainC {

    public static void main(String[] args) {

        KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
        consumerThread.start();
    }
}

示例代碼為了容易理解肥印,減少了代碼量识椰,去除了相關(guān)的業(yè)務(wù)邏輯,希望大家能看到通用的模式設(shè)計竖独,而不是看到玩具的模式裤唠。建議大家進行測試執(zhí)行實驗,把分布式消息隊列加入到自己的工具箱莹痢,對求職的競爭力也有一定的提升种蘸。

如果不是很理解,也不用擔心竞膳,因為對于其中的部分細節(jié)我也沒有很好的掌握航瞭,經(jīng)過應(yīng)用才感覺能有一點體會。

參考資料

  • 七周七并發(fā)模型
  • solr參考手冊
  • http異步客戶端文檔
  • kafak官網(wǎng)文檔
  • 網(wǎng)上的文章
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末坦辟,一起剝皮案震驚了整個濱河市刊侯,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖瓮下,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件前塔,死亡現(xiàn)場離奇詭異,居然都是意外死亡亭饵,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門梁厉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來辜羊,“玉大人,你說我怎么就攤上這事“送海” “怎么了碱妆?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長昔驱。 經(jīng)常有香客問我疹尾,道長,這世上最難降的妖魔是什么骤肛? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任航棱,我火速辦了婚禮,結(jié)果婚禮上萌衬,老公的妹妹穿的比我還像新娘饮醇。我一直安慰自己,他們只是感情好秕豫,可當我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布朴艰。 她就那樣靜靜地躺著,像睡著了一般混移。 火紅的嫁衣襯著肌膚如雪祠墅。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天歌径,我揣著相機與錄音毁嗦,去河邊找鬼。 笑死回铛,一個胖子當著我的面吹牛狗准,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播茵肃,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼腔长,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了验残?” 一聲冷哼從身側(cè)響起捞附,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎您没,沒想到半個月后鸟召,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡氨鹏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年欧募,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片喻犁。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡槽片,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出肢础,到底是詐尸還是另有隱情还栓,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布传轰,位于F島的核電站剩盒,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏慨蛙。R本人自食惡果不足惜辽聊,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望期贫。 院中可真熱鬧跟匆,春花似錦、人聲如沸通砍。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽封孙。三九已至迹冤,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間虎忌,已是汗流浹背泡徙。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留膜蠢,地道東北人堪藐。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像挑围,于是被迫代替她去往敵國和親庶橱。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容