爬蟲(chóng)架構(gòu)|利用Kafka處理數(shù)據(jù)推送問(wèn)題(2)

在前一篇文章爬蟲(chóng)架構(gòu)|利用Kafka處理數(shù)據(jù)推送問(wèn)題(1)中對(duì)Kafka做了一個(gè)介紹无拗,以及環(huán)境搭建,最后是選擇使用阿里云的Kafka,這一篇文章繼續(xù)說(shuō)使用阿里云的Kafka的一些知識(shí)鹏浅。

一、發(fā)布者最佳實(shí)踐

發(fā)布的完整代碼(根據(jù)自己的業(yè)務(wù)做相應(yīng)處理):

package com.yimian.controller.kafka;

import java.util.Date;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.alibaba.fastjson.JSON;
import com.yimian.model.SpiderData;

/**
 * 生產(chǎn)者
 * 
 * @author huangtao
 *
 */
@Controller
@RequestMapping(value = "kafka/producer")
public class KafkaProducerController {

    private static Producer<String, String> producer;
    private static Properties kafkaProperties;

    static {
        // 設(shè)置sasl文件的路徑
        JavaKafkaConfigurer.configureSasl();
        // 加載kafka.properties
        kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // 設(shè)置接入點(diǎn)柬唯,請(qǐng)通過(guò)控制臺(tái)獲取對(duì)應(yīng)Topic的接入點(diǎn)
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        // 設(shè)置SSL根證書(shū)的路徑,請(qǐng)記得將XXX修改為自己的路徑
        // 與sasl路徑類(lèi)似属提,該文件也不能被打包到j(luò)ar中
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
        // 根證書(shū)store的密碼权逗,保持不變
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        // 接入?yún)f(xié)議,目前支持使用SASL_SSL協(xié)議接入
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        // SASL鑒權(quán)方式冤议,保持不變
        props.put(SaslConfigs.SASL_MECHANISM, "ONS");
        // Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 請(qǐng)求的最長(zhǎng)等待時(shí)間
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);

        // 構(gòu)造Producer對(duì)象斟薇,注意,該對(duì)象是線(xiàn)程安全的恕酸,一般來(lái)說(shuō)堪滨,一個(gè)進(jìn)程內(nèi)一個(gè)Producer對(duì)象即可;
        // 如果想提高性能蕊温,可以多構(gòu)造幾個(gè)對(duì)象袱箱,但不要太多遏乔,最好不要超過(guò)5個(gè)
        producer = new KafkaProducer<String, String>(props);
    }
    
    /**
     * 發(fā)送消息給kafka
     * @param topic
     * @param msg
     */
    public static void sendMsgToKafka(String topic, SpiderData msg) {
        try {
            // 發(fā)送消息,并獲得一個(gè)Future對(duì)象
            Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(topic, String.valueOf(new Date().getTime()),
                    JSON.toJSONString(msg)));
            // 同步獲得Future對(duì)象的結(jié)果
            RecordMetadata recordMetadata = metadataFuture.get();
            System.out.println("Produce ok:" + recordMetadata.toString());
        } catch (Exception e) {
            /**
             * 要考慮重試~
             * 在分布式環(huán)境下发笔,由于網(wǎng)絡(luò)等原因盟萨,偶爾的發(fā)送失敗是常見(jiàn)的。這種失敗有可能是消息已經(jīng)發(fā)送成功了讨,但是 Ack 失敗捻激,也有可能是確實(shí)沒(méi)發(fā)送成功。
             * 消息隊(duì)列 Kafka 是 VIP 網(wǎng)絡(luò)架構(gòu)前计,會(huì)主動(dòng)掐掉空閑連接(一般 30 秒沒(méi)活動(dòng))胞谭,也就是說(shuō),不是一直活躍的客戶(hù)端會(huì)經(jīng)常收到”connection rest by peer”這樣的錯(cuò)誤男杈,因此建議都考慮重試丈屹。
             */
            // 參考常見(jiàn)報(bào)錯(cuò):
            // https://help.aliyun.com/document_detail/68168.html?spm=a2c4g.11186623.6.567.2OMgCB
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }

    @RequestMapping(value = "init", produces = "text/html;charset=UTF-8")
    @ResponseBody
    public void init() {
        // 構(gòu)造一個(gè)Kafka消息
        String topic = kafkaProperties.getProperty("topic"); // 消息所屬的Topic,請(qǐng)?jiān)诳刂婆_(tái)申請(qǐng)之后伶棒,填寫(xiě)在這里
        SpiderData data = new SpiderData();
        data.setDescUrl("www.baidu.com");
        data.setTitle("百度");

        sendMsgToKafka(topic, data);
    }
}

Kafka的發(fā)送非常簡(jiǎn)單旺垒,代碼片段如下:

Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
    topic,   \\ topic
    null,    \\ 分區(qū)編號(hào),這里最好為 null肤无,交給 producer 去分配
    System.currentTimeMillis(), \\時(shí)間戳
    String.valueOf(message.hashCode()), \\ key袖牙,可以在控制臺(tái)通過(guò)這個(gè) Key 查找消息,這個(gè) key 最好唯一舅锄;
    message)); \\ value,消息內(nèi)容

message可以是一個(gè)JSON類(lèi)型的對(duì)象司忱,如上面例子中的JSON.toJSONString(new SpiderData())

1.1皇忿、Key 和 Value

Kafka 0.10.0.0 的消息字段只有兩個(gè):Key 和 Value。為了便于追蹤坦仍,重要消息最好都設(shè)置一個(gè)唯一的 Key鳍烁。通過(guò) Key 追蹤某消息,打印發(fā)送日志和消費(fèi)日志繁扎,了解該消息的發(fā)送和消費(fèi)情況幔荒;更重要的是,您可以在控制臺(tái)可以根據(jù) Key 查詢(xún)消息的內(nèi)容梳玫。

1.2爹梁、失敗重試

在分布式環(huán)境下,由于網(wǎng)絡(luò)等原因提澎,偶爾的發(fā)送失敗是常見(jiàn)的姚垃。這種失敗有可能是消息已經(jīng)發(fā)送成功,但是 Ack 失敗盼忌,也有可能是確實(shí)沒(méi)發(fā)送成功积糯。
消息隊(duì)列 Kafka 是 VIP 網(wǎng)絡(luò)架構(gòu)掂墓,會(huì)主動(dòng)掐掉空閑連接(一般 30 秒沒(méi)活動(dòng)),也就是說(shuō)看成,不是一直活躍的客戶(hù)端會(huì)經(jīng)常收到”connection rest by peer”這樣的錯(cuò)誤君编,因此建議都考慮重試。

1.3川慌、異步發(fā)送

需要注意的是這個(gè)接口是異步發(fā)送的吃嘿;如果你想得到發(fā)送的結(jié)果,可以調(diào)用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)窘游。

1.4唠椭、線(xiàn)程安全

Producer 是線(xiàn)程安全的,且可以往任何 Topic 發(fā)送消息忍饰。一般一個(gè)應(yīng)用贪嫂,對(duì)應(yīng)一個(gè) Producer 就足夠了。

1.5艾蓝、Ack

消息隊(duì)列 Kafka 沒(méi)有考慮這個(gè)參數(shù)力崇,都認(rèn)為是“all”,即所有消息同步到 Slave 節(jié)點(diǎn)后才會(huì)返回成功的確認(rèn)消息給客戶(hù)端赢织。

1.6亮靴、Batch

Batch 的基本思路是:把消息緩存在內(nèi)存中,并進(jìn)行打包發(fā)送于置。Kafka 通過(guò) Batch 來(lái)提高吞吐茧吊,但同時(shí)也會(huì)增加延遲,生產(chǎn)時(shí)應(yīng)該對(duì)兩者予以權(quán)衡八毯。
在構(gòu)建 Producer 時(shí)搓侄,需要考慮以下兩個(gè)參數(shù):

  • batch.size : 發(fā)往每個(gè) Partition 的消息個(gè)數(shù)緩存量達(dá)到這個(gè)數(shù)值時(shí),就會(huì)觸發(fā)一次網(wǎng)絡(luò)請(qǐng)求话速,把消息真正發(fā)往服務(wù)器讶踪;
  • linger.ms : 每個(gè)消息待在緩存中的最大時(shí)間,超過(guò)這個(gè)時(shí)間泊交,就會(huì)忽略 batch.size 的限制乳讥,立即把消息發(fā)往服務(wù)器。

由此可見(jiàn)廓俭,Kafka 什么時(shí)候把消息真正發(fā)往服務(wù)器云石,是通過(guò)上面兩個(gè)參數(shù)共同決定的;
batch.size 有助于提高吞吐白指,linger.ms 有助于控制延遲留晚。您可以根據(jù)具體業(yè)務(wù)進(jìn)行調(diào)整。

1.7、OOM

結(jié)合 Kafka Batch 的設(shè)計(jì)思路错维,Kafka 會(huì)緩存消息并打包發(fā)送奖地,如果緩存太多,則有可能造成 OOM赋焕。

  • buffer.memory : 所有緩存消息的總體大小超過(guò)這個(gè)數(shù)值后参歹,就會(huì)觸發(fā)把消息發(fā)往服務(wù)器。此時(shí)會(huì)忽略 batch.sizelinger.ms 的限制隆判。
  • buffer.memory 的默認(rèn)數(shù)值是 32M犬庇,對(duì)于單個(gè) Producer 來(lái)說(shuō),可以保證足夠的性能侨嘀。需要注意的是臭挽,如果你在同一個(gè) JVM 中啟動(dòng)多個(gè) Producer,那么每個(gè) Producer 都有可能占用32M 緩存空間咬腕,此時(shí)便有可能觸發(fā) OOM欢峰。
  • 在生產(chǎn)時(shí),一般沒(méi)有必要啟動(dòng)多個(gè) Producer涨共;如果特殊情況需要纽帖,則需要考慮buffer.memory的大小,避免觸發(fā) OOM举反。

1.8懊直、分區(qū)順序

單個(gè)分區(qū)內(nèi),消息是按照發(fā)送順序儲(chǔ)存的火鼻,是基本有序的室囊。
但消息隊(duì)列 Kafka 并不保證單個(gè)分區(qū)內(nèi)絕對(duì)有序,所以在某些情況下魁索,會(huì)發(fā)生少量消息亂序波俄。比如:消息隊(duì)列 Kafka 為了提高可用性,某個(gè)分區(qū)掛掉后把消息 Failover 到其它分區(qū)蛾默。

二、訂閱者最佳實(shí)踐

消費(fèi)的完整代碼(根據(jù)自己的業(yè)務(wù)做相應(yīng)處理):

package com.yimian.controller.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yimian.model.SpiderData;

/**
 * 消費(fèi)者
 * 
 * @author huangtao
 *
 */
@Controller
@RequestMapping(value = "kafka/consumer")
public class KafkaConsumerController {

    private static Consumer<String, String> consumer;

    static {
        // 設(shè)置sasl文件的路徑
        JavaKafkaConfigurer.configureSasl();

        // 加載kafka.properties
        Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // 設(shè)置接入點(diǎn)捉貌,請(qǐng)通過(guò)控制臺(tái)獲取對(duì)應(yīng)Topic的接入點(diǎn)
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        // 設(shè)置SSL根證書(shū)的路徑支鸡,請(qǐng)記得將XXX修改為自己的路徑
        // 與sasl路徑類(lèi)似,該文件也不能被打包到j(luò)ar中
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
        // 根證書(shū)store的密碼趁窃,保持不變
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        // 接入?yún)f(xié)議牧挣,目前支持使用SASL_SSL協(xié)議接入
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        // SASL鑒權(quán)方式,保持不變
        props.put(SaslConfigs.SASL_MECHANISM, "ONS");
        // 兩次poll之間的最大允許間隔
        // 請(qǐng)不要改得太大醒陆,服務(wù)器會(huì)掐掉空閑連接瀑构,不要超過(guò)30000
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 25000);
        // 每次poll的最大數(shù)量
        // 注意該值不要改得太大,如果poll太多數(shù)據(jù)刨摩,而不能在下次poll之前消費(fèi)完寺晌,則會(huì)觸發(fā)一次負(fù)載均衡世吨,產(chǎn)生卡頓
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        // 消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        // 當(dāng)前消費(fèi)實(shí)例所屬的消費(fèi)組,請(qǐng)?jiān)诳刂婆_(tái)申請(qǐng)之后填寫(xiě)
        // 屬于同一個(gè)組的消費(fèi)實(shí)例呻征,會(huì)負(fù)載消費(fèi)消息
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        // 構(gòu)造消息對(duì)象耘婚,也即生成一個(gè)消費(fèi)實(shí)例
        consumer = new KafkaConsumer<String, String>(props);

        // 設(shè)置消費(fèi)組訂閱的Topic,可以訂閱多個(gè)
        // 如果GROUP_ID_CONFIG是一樣陆赋,則訂閱的Topic也建議設(shè)置成一樣
        List<String> subscribedTopics = new ArrayList<String>();
        // 如果需要訂閱多個(gè)Topic沐祷,則在這里add進(jìn)去即可
        // 每個(gè)Topic需要先在控制臺(tái)進(jìn)行創(chuàng)建
        subscribedTopics.add(kafkaProperties.getProperty("topic"));
        consumer.subscribe(subscribedTopics);
    }
    
    @RequestMapping(value = "init", produces = "text/html;charset=UTF-8")
    @ResponseBody
    public void init() {
        // 循環(huán)消費(fèi)消息
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                // 必須在下次poll之前消費(fèi)完這些數(shù)據(jù), 且總耗時(shí)不得超過(guò)SESSION_TIMEOUT_MS_CONFIG
                // 建議開(kāi)一個(gè)單獨(dú)的線(xiàn)程池來(lái)消費(fèi)消息,然后異步返回結(jié)果
                for (ConsumerRecord<String, String> record : records) {
                    JSONObject jsonMsg = JSON.parseObject(record.value());  
                    SpiderData spiderData = JSONObject.toJavaObject(jsonMsg, SpiderData.class);  
                    
                    System.out.println(spiderData.toString());
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {

                }
                // 參考常見(jiàn)報(bào)錯(cuò):
                // https://help.aliyun.com/document_detail/68168.html?spm=a2c4g.11186623.6.567.2OMgCB
                e.printStackTrace();
            }
        }
    }
}

消費(fèi)時(shí)把JSON數(shù)據(jù)反序列化:

for (ConsumerRecord<String, String> record : records) {
    JSONObject jsonMsg = JSON.parseObject(record.value());  
    SpiderData spiderData = JSONObject.toJavaObject(jsonMsg, SpiderData.class);  
}

2.1攒岛、消費(fèi)消息基本流程

Kafka 訂閱者在訂閱消息時(shí)的基本流程是:

  1. Poll 數(shù)據(jù)
  2. 執(zhí)行消費(fèi)邏輯
  3. 再次 poll 數(shù)據(jù)

2.2赖临、負(fù)載消費(fèi)

每個(gè) Consumer Group 可以包含多個(gè)消費(fèi)實(shí)例,也即可以啟動(dòng)多個(gè) Kafka Consumer灾锯,并把參數(shù) group.id 設(shè)置成相同的值兢榨。屬于同一個(gè) Consumer Group 的消費(fèi)實(shí)例會(huì)負(fù)載消費(fèi)訂閱的 topic。

示例1:Consumer Group A 訂閱了 Topic A挠进,并開(kāi)啟三個(gè)消費(fèi)實(shí)例 C1色乾、C2、C3领突,則發(fā)送到 Topic A 的每條消息最終只會(huì)傳給 C1暖璧、C2、C3 的某一個(gè)君旦。Kafka 默認(rèn)會(huì)均勻地把消息傳給各個(gè)消息實(shí)例澎办,以做到消費(fèi)負(fù)載均衡。

Kafka 負(fù)載消費(fèi)的內(nèi)部原理是金砍,把訂閱的 Topic 的分區(qū)局蚀,平均分配給各個(gè)消費(fèi)實(shí)例。因此恕稠,消費(fèi)實(shí)例的個(gè)數(shù)不要大于分區(qū)的數(shù)量琅绅,否則會(huì)有實(shí)例分配不到任何分區(qū)而處于空跑狀態(tài)。這個(gè)負(fù)載均衡發(fā)生的時(shí)間鹅巍,除了第一次啟動(dòng)上線(xiàn)之外千扶,后續(xù)消費(fèi)實(shí)例發(fā)生重啟、增加骆捧、減少等變更時(shí)澎羞,都會(huì)觸發(fā)一次負(fù)載均衡。

消息隊(duì)列 Kafka 分區(qū)的數(shù)量至少是 16 個(gè)敛苇,已經(jīng)足夠滿(mǎn)足大部分用戶(hù)的需求妆绞,且云上服務(wù)會(huì)根據(jù)容量調(diào)整分區(qū)數(shù)。

2.3、多個(gè)訂閱

一個(gè) Consumer Group 可以訂閱多個(gè) Topic括饶。一個(gè) Topic 也可以被多個(gè) Consumer Group 訂閱株茶,且各個(gè) Consumer Group 獨(dú)立消費(fèi) Topic 下的所有消息。

示例1:Consumer Group A 訂閱了 Topic A巷帝,Consumer Group B 也訂閱了 Topic A忌卤,則發(fā)送到 Topic A 的每條消息,不僅會(huì)傳一份給 Consumer Group A 的消費(fèi)實(shí)例楞泼,也會(huì)傳一份給 Consumer Group B 的消費(fèi)實(shí)例驰徊,且這兩個(gè)過(guò)程相互獨(dú)立,相互沒(méi)有任何影響堕阔。

2.4棍厂、消費(fèi)位點(diǎn)

每個(gè) Topic 會(huì)有多個(gè)分區(qū),每個(gè)分區(qū)會(huì)統(tǒng)計(jì)當(dāng)前消息的總條數(shù)超陆,這個(gè)稱(chēng)為最大位點(diǎn) MaxOffset牺弹。Kafka Consumer 會(huì)按順序依次消費(fèi)分區(qū)內(nèi)的每條消息,記錄已經(jīng)消費(fèi)了的消息條數(shù)时呀,稱(chēng)為ConsumerOffset张漂。

剩余的未消費(fèi)的條數(shù)(也稱(chēng)為消息堆積量) = MaxOffset - ConsumerOffset

2.5、位點(diǎn)提交

Kafka 消費(fèi)者有兩個(gè)相關(guān)參數(shù):

  • enable.auto.commit:默認(rèn)值為 true谨娜。
  • auto.commit.interval.ms: 默認(rèn)值為 1000航攒,也即 1s。

這兩個(gè)參數(shù)組合的結(jié)果就是趴梢,每次 poll 時(shí)漠畜,再拉取數(shù)據(jù)前會(huì)預(yù)先做下面這件事:

  • 檢查上次提交位點(diǎn)的時(shí)間,如果距離當(dāng)前時(shí)間已經(jīng)超過(guò) auto.commit.interval.ms坞靶,則啟動(dòng)位點(diǎn)提交動(dòng)作憔狞;

因此,如果 enable.auto.commit 設(shè)置為 true彰阴,需要在每次 poll 時(shí)瘾敢,確保前一次 poll 出來(lái)的數(shù)據(jù)已經(jīng)消費(fèi)完畢,否則可能導(dǎo)致位點(diǎn)跳躍尿这;

如果想自己控制位點(diǎn)提交廉丽,則把 enable.auto.commit 設(shè)為 false,并調(diào)用 commit(offsets)函數(shù)自行控制位點(diǎn)提交妻味。

2.6、消息重復(fù)以及消費(fèi)冪等

Kafka 消費(fèi)的語(yǔ)義是 “At Lease Once”欣福, 也就是至少投遞一次责球,保證消息不丟,但是不會(huì)保證消息不重復(fù)。在出現(xiàn)網(wǎng)絡(luò)問(wèn)題雏逾、客戶(hù)端重啟時(shí)均有可能出現(xiàn)少量重復(fù)消息嘉裤,此時(shí)應(yīng)用消費(fèi)端,如果對(duì)消息重復(fù)比較敏感(比如說(shuō)訂單交易類(lèi))栖博,則應(yīng)該做到消息冪等屑宠。

以數(shù)據(jù)庫(kù)類(lèi)應(yīng)用為例,常用做法是:

  • 發(fā)送消息時(shí)仇让,傳入 key 作為唯一流水號(hào)ID典奉;
  • 消費(fèi)消息時(shí),判斷 key 是否已經(jīng)消費(fèi)過(guò)丧叽,如果已經(jīng)消費(fèi)過(guò)了卫玖,則忽略,如果沒(méi)消費(fèi)過(guò)踊淳,則消費(fèi)一次假瞬;

當(dāng)然,如果應(yīng)用本身對(duì)少量消息重復(fù)不敏感迂尝,則不需要做此類(lèi)冪等檢查脱茉。

2.7、消費(fèi)失敗

Kafka 是按分區(qū)一條一條消息順序向前消費(fèi)推進(jìn)的垄开,如果消費(fèi)端拿到某條消息后消費(fèi)邏輯失敗琴许,比如應(yīng)用服務(wù)器出現(xiàn)了臟數(shù)據(jù),導(dǎo)致某條消息處理失敗说榆,等待人工干預(yù)虚吟,該怎么辦呢?

  • 如果失敗后一直嘗試再次執(zhí)行消費(fèi)邏輯签财,則有可能造成消費(fèi)線(xiàn)程阻塞在當(dāng)前消息串慰,無(wú)法向前推進(jìn),造成消息堆積唱蒸;
  • 由于 Kafka 自身沒(méi)有處理失敗消息的設(shè)計(jì)邦鲫,實(shí)踐中通常會(huì)打印失敗的消息、或者存儲(chǔ)到某個(gè)服務(wù)(比如創(chuàng)建一個(gè) Topic 專(zhuān)門(mén)用來(lái)放失敗的消息)神汹,然后定時(shí) check 失敗消息的情況庆捺,分析失敗原因,根據(jù)情況處理屁魏。

2.8滔以、消費(fèi)阻塞以及堆積

消費(fèi)端最常見(jiàn)的問(wèn)題就是消費(fèi)堆積,最常造成堆積的原因是:

  • 消費(fèi)速度跟不上生產(chǎn)速度氓拼,此時(shí)應(yīng)該提高消費(fèi)速度你画,詳情見(jiàn)本文下一節(jié)<提高消費(fèi)速度>抵碟;
  • 消費(fèi)端產(chǎn)生了阻塞;

消費(fèi)端拿到消息后坏匪,執(zhí)行消費(fèi)邏輯拟逮,通常會(huì)執(zhí)行一些遠(yuǎn)程調(diào)用,如果這個(gè)時(shí)候同步等待結(jié)果适滓,則有可能造成一直等待敦迄,消費(fèi)進(jìn)程無(wú)法向前推進(jìn)。

消費(fèi)端應(yīng)該竭力避免堵塞消費(fèi)線(xiàn)程凭迹,如果存在等待調(diào)用結(jié)果的情況罚屋,設(shè)置等待的超時(shí)時(shí)間,超過(guò)時(shí)間后蕊苗,作消費(fèi)失敗處理沿后。

2.9、提高消費(fèi)速度

提高消費(fèi)速度有兩個(gè)辦法:

  • 增加 Consumer 實(shí)例個(gè)數(shù)朽砰;
  • 增加消費(fèi)線(xiàn)程尖滚;

增加 Consumer 實(shí)例,可以在進(jìn)程內(nèi)直接增加(需要保證每個(gè)實(shí)例一個(gè)線(xiàn)程瞧柔,否則沒(méi)有太大意義)漆弄,也可以部署多個(gè)消費(fèi)實(shí)例進(jìn)程;需要注意的是造锅,實(shí)例個(gè)數(shù)超過(guò)分區(qū)數(shù)量后就不再能提高速度撼唾,將會(huì)有消費(fèi)實(shí)例不工作;

增加 Consumer 實(shí)例本質(zhì)上也是增加線(xiàn)程的方式來(lái)提升速度哥蔚,因此更加重要的性能提升方式是增加消費(fèi)線(xiàn)程倒谷,最基本的方式如下:

  1. 定義一個(gè)線(xiàn)程池;
  2. poll 數(shù)據(jù)糙箍;
  3. 把數(shù)據(jù)提交到線(xiàn)程池進(jìn)行并發(fā)處理渤愁;
  4. 等并發(fā)結(jié)果返回成功再次poll數(shù)據(jù)執(zhí)行。

2.10消息過(guò)濾

Kafka 自身沒(méi)有消息過(guò)濾的語(yǔ)義深夯。實(shí)踐中可以采取以下兩個(gè)辦法:

  • 如果過(guò)濾的種類(lèi)不多抖格,可以采取多個(gè) Topic 的方式達(dá)到過(guò)濾的目的;
  • 如果過(guò)濾的種類(lèi)多咕晋,則最好在客戶(hù)端業(yè)務(wù)層面自行過(guò)濾雹拄。

實(shí)踐中根據(jù)業(yè)務(wù)具體情況進(jìn)行選擇,可以綜合運(yùn)用上面兩種辦法掌呜。

2.11滓玖、消息廣播

Kafka 自身沒(méi)有消息廣播的語(yǔ)義,可以通過(guò)創(chuàng)建不同的 Consumer Group來(lái)模擬實(shí)現(xiàn)质蕉。

2.12势篡、訂閱關(guān)系

同一個(gè) Consumer Group 內(nèi)损姜,各個(gè)消費(fèi)實(shí)例訂閱的 Topic 最好保持一致,避免給排查問(wèn)題帶來(lái)干擾殊霞。


參考資料:阿里云消息隊(duì)列Kafka的幫助文檔

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市汰蓉,隨后出現(xiàn)的幾起案子绷蹲,更是在濱河造成了極大的恐慌,老刑警劉巖顾孽,帶你破解...
    沈念sama閱讀 216,744評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件祝钢,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡若厚,警方通過(guò)查閱死者的電腦和手機(jī)拦英,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)测秸,“玉大人疤估,你說(shuō)我怎么就攤上這事■耄” “怎么了铃拇?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,105評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)沈撞。 經(jīng)常有香客問(wèn)我慷荔,道長(zhǎng),這世上最難降的妖魔是什么缠俺? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,242評(píng)論 1 292
  • 正文 為了忘掉前任显晶,我火速辦了婚禮,結(jié)果婚禮上壹士,老公的妹妹穿的比我還像新娘磷雇。我一直安慰自己,他們只是感情好墓卦,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,269評(píng)論 6 389
  • 文/花漫 我一把揭開(kāi)白布倦春。 她就那樣靜靜地躺著,像睡著了一般落剪。 火紅的嫁衣襯著肌膚如雪睁本。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,215評(píng)論 1 299
  • 那天忠怖,我揣著相機(jī)與錄音呢堰,去河邊找鬼。 笑死凡泣,一個(gè)胖子當(dāng)著我的面吹牛枉疼,可吹牛的內(nèi)容都是我干的皮假。 我是一名探鬼主播,決...
    沈念sama閱讀 40,096評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼骂维,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼惹资!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起航闺,我...
    開(kāi)封第一講書(shū)人閱讀 38,939評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤褪测,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后潦刃,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體侮措,經(jīng)...
    沈念sama閱讀 45,354評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,573評(píng)論 2 333
  • 正文 我和宋清朗相戀三年乖杠,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了分扎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,745評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡胧洒,死狀恐怖畏吓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情略荡,我是刑警寧澤庵佣,帶...
    沈念sama閱讀 35,448評(píng)論 5 344
  • 正文 年R本政府宣布,位于F島的核電站汛兜,受9級(jí)特大地震影響巴粪,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜粥谬,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,048評(píng)論 3 327
  • 文/蒙蒙 一肛根、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧漏策,春花似錦派哲、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,683評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至感耙,卻和暖如春褂乍,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背即硼。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,838評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工逃片, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人只酥。 一個(gè)月前我還...
    沈念sama閱讀 47,776評(píng)論 2 369
  • 正文 我出身青樓褥实,卻偏偏與公主長(zhǎng)得像呀狼,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子损离,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,652評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容