java多線程編程核心技術(shù)

一,共享資源 使用sleep()觀察數(shù)據(jù)紊亂

注意:以下幾份代碼其中生產(chǎn)者(Producer.java),消費者(Consumer.java),和測試類(TestDemo.java)都完全一樣主要對共享資源文件(Resource.java)操作

Resource.java共享資源

//共享資源對象
public class Resource {
private String name;
private String gender;

// 讓生產(chǎn)者調(diào)用設(shè)置共享資源的成員變量以供消費者的打印操作
public void push(String name, String gender) {
    this.name = name;
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    this.gender = gender;
}

// 供消費者從共享資源取出數(shù)據(jù)
public void pop() {
    try {

        Thread.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(this.name + "-" + this.gender);
}

Producer.java生產(chǎn)者

public class Producer implements Runnable {
public Resource resource = null;

public Producer(Resource resource) {
    this.resource = resource;
}
@Override
public void run() {
    for (int i = 0; i < 100; i++) {
        if (i % 2 == 0) {
            resource.push("鳳姐", "女");
        } else {
            resource.push("春哥", "男");
        }
    }
}

Consumer.java消費者

 public class Consumer implements Runnable {
// 消費者擁有共享資源對象以便實現(xiàn)調(diào)用方法執(zhí)行打印操作
public Resource resource = null;

// Creatr Constructor
public Consumer(Resource resource) {
    this.resource = resource;
}
// 重寫run()方法 執(zhí)行pop()方法打印結(jié)果
@Override
public void run() {
    for (int i = 0; i < 50; i++) {

        resource.pop();
    }
}

TestDemo.java測試代碼

   public class TestDemo {
   public static void main(String[] args) {
    // 創(chuàng)建共享資源對象 開啟線程
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();
    new Thread(new Consumer(resource)).start();
}

分析結(jié)果:鳳姐-男 鳳姐-女 鳳姐-男 發(fā)現(xiàn)性別亂序了
剛開始打印 鳳姐-男 生產(chǎn)者先生產(chǎn)出春哥哥-男,此時消費者沒有消費,生產(chǎn)者繼續(xù)生產(chǎn)出姓名為鳳姐,此時消費者開始消費了.

二,使用同步鎖 避免數(shù)據(jù)紊亂

Resource.java共享資源

//共享資源對象
public class Resource{
private String name;
private String gender;
//生產(chǎn)者向共享資源存儲數(shù)據(jù)
synchronized public void push(String name, String gender)  {
    this.name = name;
    try{
    Thread.sleep(100);
    }catch(InterruptedException e){
        e.printStackTrace();
    }
    this.gender = gender;
}
//  消費者從共享資源對象取數(shù)據(jù)
synchronized public void pop(){
    try{
        Thread.sleep(100);
    }catch(InterruptedException e){
        e.printStackTrace();
    }
    System.out.println(this.name + "-" +this.gender);
}

出現(xiàn)性別紊亂的情況.

  • 解決方案:只要保證在生產(chǎn)姓名和性別的過程保持同步,中間不能被消費者線程進來取走數(shù)據(jù).
  • 可以使用同步代碼塊/同步方法/Lock機制來保持同步性.
三,怎么實現(xiàn)出現(xiàn)生產(chǎn)一個數(shù)據(jù),消費一個數(shù)據(jù).
  • 應(yīng)該交替出現(xiàn): 春哥哥-男-->鳳姐-女-->春哥哥-男-->鳳姐-女.....

  • 解決方案: 使用 等待和喚醒機制.

  • wait():執(zhí)行該方法的線程對象釋放同步鎖,JVM把該線程存放到等待池中,等待其他的線程喚醒該線程.
    notify:執(zhí)行該方法的線程喚醒在等待池中等待的任意一個線程,把線程轉(zhuǎn)到鎖池中等待.
    notifyAll():執(zhí)行該方法的線程喚醒在等待池中等待的所有的線程,把線程轉(zhuǎn)到鎖池中等待.
    注意:上述方法只能被同步監(jiān)聽鎖對象來調(diào)用,否則報錯IllegalMonitorStateException..

Resource.java共享資源

//共享資源對象
public class Resource {
private String name;
private String gender;
private boolean isEmpty = true;// 表示共享資源對象是否為空的狀態(tài) 第一次為空要設(shè)置默認值為true

// 生產(chǎn)者向共享資源對象中存儲數(shù)據(jù)
synchronized public void push(String name, String gender) {

    try {
        while (!isEmpty) { // 當(dāng)共享資源對象有值時 ,不空等著消費者來獲取值 使用同步鎖對象來調(diào)用
            // 表示當(dāng)前線程釋放同步鎖進入等待池只能被其他線程喚醒
            this.wait();
        }

        this.name = name;
        Thread.sleep(100);
        this.gender = gender;
        // 生成結(jié)束
        isEmpty = false;// 設(shè)置共享資源對象為空
        this.notify();// 喚醒一個消費者
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

// 消費者從共享資源對象中取數(shù)據(jù)
synchronized public void pop() {
    try {
        while (isEmpty) {// 當(dāng)前共享資源為空 等待生產(chǎn)者來生產(chǎn)
            // 使用同步鎖對象來調(diào)用此方法 表示當(dāng)前線程釋放同步鎖進入等待池只能被其他線程喚醒
            this.wait();
        }
        // 消費開始
        Thread.sleep(100);
        System.out.println(this.name + "-" + this.gender);
        // 消費結(jié)束
        isEmpty = true;
        // 喚醒其他線程
        this.notify();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
四, 線程通信-使用Lock和Condition接口

wait和notify方法,只能被同步監(jiān)聽鎖對象來調(diào)用,否則報錯IllegalMonitorStateException.
那么現(xiàn)在問題來了,Lock機制根本就沒有同步鎖了,也就沒有自動獲取鎖和自動釋放鎖的概念.
因為沒有同步鎖,所以Lock機制不能調(diào)用wait和notify方法.
解決方案:Java5中提供了Lock機制的同時提供了處理Lock機制的通信控制的Condition接口.

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//共享資源對象
public class Resource {
private String name;
private String gender;
private boolean isEmpty = true;
private final Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

// 生產(chǎn)者向共享資源存儲數(shù)據(jù)
public void push(String name, String gender) {
    lock.lock();
    try {
        while (!isEmpty) {
            condition.await();
        }
        // 開始生成
        this.name = name;
        Thread.sleep(100);
        this.gender = gender;
        // 生成結(jié)束
        isEmpty = false;
        condition.signalAll();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();// 釋放鎖
    }
}

// 消費者向共享資源獲取數(shù)據(jù)
public void pop() {
    lock.lock();
    try {
        while (isEmpty) {
            condition.await();
        }
        Thread.sleep(100);
        System.out.println(this.name + "-" + this.gender);
        // 消費結(jié)束
        isEmpty = true;
        condition.signalAll();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}
五,線程的生命周期
  • 線程狀態(tài)


    線程狀態(tài)
  • 說法 一


  • 說法 二



    有人又把阻塞狀態(tài),等待狀態(tài),計時等待狀態(tài)合稱為阻塞狀態(tài).


線程對象的狀態(tài)存放在Thread類的內(nèi)部類(State)中:

注意:Thread.State類其實是一個枚舉類.
因為線程對象的狀態(tài)是固定的,只有6種,此時使用枚舉來表示是最恰當(dāng)?shù)?

  • 1: 新建狀態(tài)(new):使用new創(chuàng)建一個線程對象,僅僅在堆中分配內(nèi)存空間,在調(diào)用start方法之前.
    新建狀態(tài)下,線程壓根就沒有啟動,僅僅只是存在一個線程對象而已.
    Thread t = new Thread();//此時t就屬于新建狀態(tài)

    當(dāng)新建狀態(tài)下的線程對象調(diào)用了start方法,此時從新建狀態(tài)進入可運行狀態(tài).
    線程對象的start方法只能調(diào)用一次,否則報錯:IllegalThreadStateException.

  • 2: 可運行狀態(tài)(runnable):分成兩種狀態(tài),ready和running。分別表示就緒狀態(tài)和運行狀態(tài)盏筐。
    就緒狀態(tài):線程對象調(diào)用start方法之后,等待JVM的調(diào)度(此時該線程并沒有運行).
    運行狀態(tài):線程對象獲得JVM調(diào)度,如果存在多個CPU,那么允許多個線程并行運行.

  • 3: 阻塞狀態(tài)(blocked):正在運行的線程因為某些原因放棄CPU,暫時停止運行,就會進入阻塞狀態(tài).
    此時JVM不會給線程分配CPU,直到線程重新進入就緒狀態(tài),才有機會轉(zhuǎn)到運行狀態(tài).
    阻塞狀態(tài)只能先進入就緒狀態(tài),不能直接進入運行狀態(tài).
    阻塞狀態(tài)的兩種情況:

  • 1): 當(dāng)A線程處于運行過程時,試圖獲取同步鎖時,卻被B線程獲取.此時JVM把當(dāng)前A線程存到對象的鎖池中,A線程進入阻塞狀態(tài).

  • 2):當(dāng)線程處于運行過程時,發(fā)出了IO請求時,此時進入阻塞狀態(tài).

  • 4: 等待狀態(tài)(waiting)(等待狀態(tài)只能被其他線程喚醒):此時使用的無參數(shù)的wait方法,

    • 1):當(dāng)線程處于運行過程時,調(diào)用了wait()方法,此時JVM把當(dāng)前線程存在對象等待池中.
  • 5: 計時等待狀態(tài)(timed waiting)(使用了帶參數(shù)的wait方法或者sleep方

  • 6: 終止?fàn)顟B(tài)(terminated):通常稱為死亡狀態(tài),表示線程終止.

  • 1): 正常執(zhí)行完run方法而退出(正常死亡).

  • 2): 遇到異常而退出(出現(xiàn)異常之后,程序就會中斷)(意外死亡).


線程一旦終止,就不能再重啟啟動,否則報錯(IllegalThreadStateException).

在Thread類中過時的方法(因為存在線程安全問題,所以棄用了):
void suspend() :暫停當(dāng)前線程
void resume() :恢復(fù)當(dāng)前線程
void stop() :結(jié)束當(dāng)前線程

六, 聯(lián)合線程:

線程的join方法表示一個線程等待另一個線程完成后才執(zhí)行赤赊。join方法被調(diào)用之后伍宦,線程對象處于阻塞狀態(tài)。
有人也把這種方式稱為聯(lián)合線程霸旗,就是說把當(dāng)前線程和當(dāng)前線程所在的線程聯(lián)合成一個線程签舞。

class Join extends Thread{
public void run(){
    for(int i=0;i<50;i++){
        System.out.println("join:"+i);
    }
}
}
//聯(lián)合線程
public class UniteThread {

public static void main(String[] args) throws Exception {
    System.out.println("begin.....");
    Join joinThread = new Join();
    for(int i=0;i<50;i++){
        System.out.println("main:"+i);
        if(i==10){
            //啟動join線程
            joinThread.start();
        }
        if(i==20){
            //強制執(zhí)行該線程,執(zhí)行結(jié)束再執(zhí)行其他線程
             joinThread.join();
        }   
    }
    System.out.println("end");
  }
}
七, 后臺線程

后臺線程:在后臺運行的線程秕脓,其目的是為其他線程提供服務(wù),也稱為“守護線程"儒搭。JVM的垃圾回收線程就是典型的后臺線程吠架。
特點:若所有的前臺線程都死亡,后臺線程自動死亡,前臺線程沒有結(jié)束,后臺線程是不會結(jié)束的搂鲫。
測試線程對象是否為后臺線程:使用thread.isDaemon()傍药。
前臺線程創(chuàng)建的線程默認是前臺線程,可以通過setDaenon(true)方法設(shè)置為后臺線程,并且當(dāng)且僅當(dāng)后臺線程創(chuàng)建的新線程時,新線程是后臺線程默穴。
設(shè)置后臺線程:thread.setDaemon(true),該方法必須在start方法調(diào)用前怔檩,否則出現(xiàn)IllegalThreadStateException異常。

public class DaemonThread extends Thread {
public void run() {
    for (int i = 0; i < 100; i++) {
        System.out.println(super.getName() + "-" + i);
    }
}
public static void main(String[] args) {
    System.out.println(Thread.currentThread().isDaemon());
    for (int i = 0; i < 50; i++) {
        System.out.println("main:" + i);
        if (i == 10) {
            DaemonThread t = new DaemonThread();
            t.setDaemon(true);
            t.start();
        }
    }
  }
}
八,線程池的用法
// Executors.newCachedThreadPool();    
//創(chuàng)建一個緩沖池蓄诽,緩沖池容量大小為Integer.MAX_VALUE
// Executors.newSingleThreadExecutor();   
//創(chuàng)建容量為1的緩沖池
// Executors.newFixedThreadPool(int);    
//創(chuàng)建固定容量大小的緩沖池
class MyTask implements Runnable {
public MyTask() {
}
@Override
public void run() {
 //do something
}
}

ExecutorService executor =  Executors.newFixedThreadPool(5)
MyTask myTask = new MyTask();
executor.execute(myTask);

對于單次提交數(shù)據(jù)的數(shù)量薛训,當(dāng)然單次數(shù)量越少越快,但是次數(shù)會變多仑氛,總體時間會變長乙埃,單次提交過多闸英,執(zhí)行會非常慢,以至于可能會失敗介袜,經(jīng)過多次測試數(shù)據(jù)量在幾千到一萬時是比較能夠接受的甫何。
選擇那種線程池呢,是固定大小的遇伞,還是無限增長的辙喂。當(dāng)線程數(shù)量超過限制時會如何呢?這幾種線程池都會拋出異常鸠珠。
有一定經(jīng)驗的同志會不屑的說阻塞的線程池巍耗,基本就比較靠譜,例如加上等待隊列渐排,等待隊列用一個阻塞的隊列炬太。小的缺點是一直創(chuàng)建線程,感覺也不是非常的合理驯耻。

  • 帶隊列的線程池

    ThreadPoolExecutor  executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
               new ArrayBlockingQueue(5));
    

使用生產(chǎn)者與消費者對程序進行改進

Producer.java 生產(chǎn)者

import java.util.concurrent.ArrayBlockingQueue;
public class Producerlocal implements Runnable {
ArrayBlockingQueue<String> queue;
public Producerlocal(ArrayBlockingQueue<String> queue) {
    this.queue = queue;
}

@Override
public void run() {

    try {
        for (int i = 0; i < 1000; i++) {
            queue.put("s" + i);
        }

    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
}

Consumer.java消費者

import java.util.concurrent.ArrayBlockingQueue;
public class Consumerlocal implements Runnable {

ArrayBlockingQueue<String> queue;

public Consumerlocal(ArrayBlockingQueue<String> queue) {
    this.queue = queue;
}

@Override
public void run() {
    while (true) {
        try {
            final String take = queue.take();

            if ("poisonpill".equals(take)) {
                return;
            }
            //do something
            System.out.println(take);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

main主程序

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) throws InterruptedException {
    int threadNum = Runtime.getRuntime().availableProcessors() * 2;
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(100);
    ExecutorService executor = Executors.newFixedThreadPool(5);
    for (int i = 0; i < threadNum; i++) {
        executor.execute(new Consumerlocal(queue));
    }
    Thread pt = new Thread(new Producerlocal(queue));
    pt.start();
    pt.join();
    for (int i = 0; i < threadNum; i++) {
        queue.put("poisonpill");
    }

    executor.shutdown();
    executor.awaitTermination(10L, TimeUnit.DAYS);
}
}

程序使用了阻塞隊列亲族,隊列設(shè)置一定的大小,加入隊列超過數(shù)量會阻塞可缚,隊列空了取值也會阻塞霎迫,感興趣的同學(xué)可以查看jdk源碼矾麻。消費者線程數(shù)是CPU的兩倍肌访,對于這些類的使用需要查看手冊和寫測試代碼。對于何時結(jié)束線程也有一定的小技巧捣作,加入足夠數(shù)量的毒丸测柠。

對于代碼使用了新的模式,程序明顯加快了缘滥,到這里生產(chǎn)者消費者模式基本就結(jié)束了轰胁。如果你下次想起你的程序也需要多線程,正好適合這種模式朝扼,那么套用進來就是很好的選擇赃阀。當(dāng)然你現(xiàn)在能做的就是擼起袖子,寫一些測試代碼擎颖,找到這種模式的感覺榛斯。

因為程序的大多數(shù)時間還是在http請求上,程序的運行時間仍然不能夠接受搂捧。于是想到了利用異步io加快速度驮俗,而不用阻塞的http。但是問題是這次的http客戶端為了安全驗證進行了修改允跑,有加密驗證和單點登錄王凑,新的客戶端能適配起來有一定難度估計需要一定的時間搪柑,還是怕搞不定。異步的非阻塞io索烹,對于前面數(shù)據(jù)結(jié)果選擇的經(jīng)驗工碾,非阻塞不一定就是好!其實是沒太看懂怎么在多線程中使用,而對于所得到的效果就不得而知了百姓。

maven依賴
   <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpasyncclient</artifactId>
        <version>4.1.3</version>
    </dependency>
異步http
/*
 *    ===============================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at

 *   http://www.apache.org/licenses/LICENSE-2.0

 * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * =============================================== *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package com.github.yfor.bigdata.tdg;

import org.apache.http.HttpHost;
 import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpPipeliningClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.client.methods.AsyncCharConsumer;
 import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
import org.apache.http.protocol.HttpContext;

import java.io.IOException;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

/**
  * This example demonstrates a pipelinfed execution of multiple HTTP request / response exchanges
 * with a full content streaming.
 */
public class MainPhttpasyncclient {

public static void main(final String[] args) throws Exception {
    CloseableHttpPipeliningClient httpclient = HttpAsyncClients.createPipelining();
    try {
        httpclient.start();

        HttpHost targetHost = new HttpHost("httpbin.org", 80);
        HttpGet[] resquests = {
                new HttpGet("/"),
                new HttpGet("/ip"),
                new HttpGet("/headers"),
                new HttpGet("/get")
        };

        List<MyRequestProducer> requestProducers = new ArrayList<MyRequestProducer>();
        List<MyResponseConsumer> responseConsumers = new ArrayList<MyResponseConsumer>();
        for (HttpGet request : resquests) {
            requestProducers.add(new MyRequestProducer(targetHost, request));
            responseConsumers.add(new MyResponseConsumer(request));
        }

        Future<List<Boolean>> future = httpclient.execute(
                targetHost, requestProducers, responseConsumers, null);
        future.get();
        System.out.println("Shutting down");
    } finally {
        httpclient.close();
    }
    System.out.println("Done");
}

static class MyRequestProducer extends BasicAsyncRequestProducer {

    private final HttpRequest request;

    MyRequestProducer(final HttpHost target, final HttpRequest request) {
        super(target, request);
        this.request = request;
    }

    @Override
    public void requestCompleted(final HttpContext context) {
        super.requestCompleted(context);
        System.out.println();
        System.out.println("Request sent: " + this.request.getRequestLine());
        System.out.println("=================================================");
    }
}

static class MyResponseConsumer extends AsyncCharConsumer<Boolean> {

    private final HttpRequest request;

    MyResponseConsumer(final HttpRequest request) {
        this.request = request;
    }

    @Override
    protected void onResponseReceived(final HttpResponse response) {
        System.out.println();
        System.out.println("Response received: " + response.getStatusLine() + " -> " + this.request.getRequestLine());
        System.out.println("=================================================");
    }

    @Override
    protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException {
        while (buf.hasRemaining()) {
            buf.get();
        }
    }

    @Override
    protected void releaseResources() {
    }

    @Override
    protected Boolean buildResult(final HttpContext context) {
        System.out.println();
        System.out.println("=================================");
        System.out.println();
        return Boolean.TRUE;
    }
}
}
配置
package com.github.yfor.bigdata.tdg;

public interface KafkaProperties {
final static String zkConnect = "localhost:2181";
final static String groupId = "group21";
final static String topic = "topic4";
final static String kafkaServerURL = "localhost";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 20000;
final static int reconnectInterval = 10000;

final static String clientId = "SimpleConsumerDemoClient";
}

kafka的配置需要一定的時間渊额,可以閱讀官方文檔進行安裝并運行。

生產(chǎn)者線程
package com.github.yfor.bigdata.tdg;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
private final int size;

public Producer(String topic) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "DemoProducer");
    props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producer = new KafkaProducer<Integer, String>(props);
    this.topic = topic;
    this.isAsync = true;
    this.size = producer.partitionsFor(topic).size();

}

@Override
public void run() {
    int messageNo = 1;
    while (messageNo < 100) {
        try {
            sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String messageStr = "Message_" + messageNo;
        long startTime = System.currentTimeMillis();
        if (isAsync) { // Send asynchronously 異步
            producer.send(new ProducerRecord<>(topic, messageNo % size, messageNo, messageStr),
                    new DemoCallBack(startTime, messageNo, messageStr));
        } else { // Send synchronously 同步
            try {
                producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get();
                System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        ++messageNo;
    }

}

}

class DemoCallBack implements Callback {

private final long startTime;
private final int key;
private final String message;

public DemoCallBack(long startTime, int key, String message) {
    this.startTime = startTime;
    this.key = key;
    this.message = message;
}

@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
    long elapsedTime = System.currentTimeMillis() - startTime;
    if (metadata != null) {
        System.out.println(
                "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                        "), " +
                        "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
    } else {
        exception.printStackTrace();
    }
}
}
消費者線程
package com.github.yfor.bigdata.tdg;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class KafkaConsumer extends Thread {
private final ConsumerConnector consumer;
private final String topic;
private final int size;

public KafkaConsumer(String topic) {
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
            createConsumerConfig());
    this.topic = topic;
    this.size = 5;
}

private static ConsumerConfig createConsumerConfig() {
    Properties props = new Properties();
    props.put("zookeeper.connect", KafkaProperties.zkConnect);
    props.put("group.id", KafkaProperties.groupId);
    props.put("zookeeper.session.timeout.ms", "40000");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    return new ConsumerConfig(props);
}

@Override
public void run() {
    try {
        sleep(2000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(size));

    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    ExecutorService executor = Executors.newFixedThreadPool(size);
    for (final KafkaStream stream : streams) {
        executor.submit(new KafkaConsumerThread(stream));
    }
}
}

class KafkaConsumerThread implements Runnable {

private KafkaStream<byte[], byte[]> stream;

public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {
    this.stream = stream;
}

public void run() {
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata<byte[], byte[]> mam = it.next();
        System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "],"
                + "offset[" + mam.offset() + "], " + new String(mam.message()));

    }
}
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末垒拢,一起剝皮案震驚了整個濱河市旬迹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌子库,老刑警劉巖舱权,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異仑嗅,居然都是意外死亡宴倍,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進店門仓技,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鸵贬,“玉大人,你說我怎么就攤上這事脖捻±疲” “怎么了?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵地沮,是天一觀的道長嗜浮。 經(jīng)常有香客問我,道長摩疑,這世上最難降的妖魔是什么危融? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮雷袋,結(jié)果婚禮上吉殃,老公的妹妹穿的比我還像新娘。我一直安慰自己楷怒,他們只是感情好蛋勺,可當(dāng)我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著鸠删,像睡著了一般抱完。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上冶共,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天乾蛤,我揣著相機與錄音每界,去河邊找鬼。 笑死家卖,一個胖子當(dāng)著我的面吹牛眨层,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播上荡,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼趴樱,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了酪捡?” 一聲冷哼從身側(cè)響起叁征,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎逛薇,沒想到半個月后捺疼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡永罚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年啤呼,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片呢袱。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡官扣,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出羞福,到底是詐尸還是另有隱情惕蹄,我是刑警寧澤,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布治专,位于F島的核電站卖陵,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏张峰。R本人自食惡果不足惜赶促,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望挟炬。 院中可真熱鬧,春花似錦嗦哆、人聲如沸谤祖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽粥喜。三九已至,卻和暖如春橘券,著一層夾襖步出監(jiān)牢的瞬間额湘,已是汗流浹背卿吐。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留锋华,地道東北人嗡官。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像毯焕,于是被迫代替她去往敵國和親衍腥。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,691評論 2 361

推薦閱讀更多精彩內(nèi)容