php使用kafka

Ⅰ瘟滨、kafka擴展安裝參考

Ⅱ锯蛀、代碼
1、生產(chǎn)者

<?php
class KafkaProducer
{
    const TAG = 'KafkaProducer';

    /** @var RdKafka\ProducerTopic */
    private static $topic;

    /** @var  RdKafka\Producer */
    private static $producer;

    /**
     * @param string $brokerList
     * @param string $topic
     * @param array $isSsl 
     * @return KafkaProducer
     * @throws Exception
     */
    public static function init($brokerList, $topic, $isSsl = null)
    {
        if (!extension_loaded('rdkafka')) {
            throw new Exception("no rdkafka extension", 1);
        }

        $conf = new RdKafka\Conf();

        $conf->set('metadata.broker.list', $brokerList);
        $conf->set('broker.version.fallback', '1.0.0');
        $conf->set('compression.type', 'lz4');
        $conf->set("retries", 3);
        $conf->set("retry.backoff.ms", 1000);

        $conf->setDrMsgCb(function ($kafka, $message) {
//            static::log('推送成功:' . var_export($message, true));
        });
        $conf->setErrorCb(function ($kafka, $err, $reason) {
            throw new Exception('推送失斚诰:' . rd_kafka_err2str($err) . '严嗜。緣由:' . $reason);
        });

        if ($isSsl) {
            $conf->set('security.protocol', KAFKA_SECURITY_PROTOCOL);
            $conf->set('sasl.mechanism', KAFKA_SASL_MECHANISM);
            $conf->set('sasl.username', $isSsl['username']);
            $conf->set('sasl.password', $isSsl['password']);
        }

        static::$producer = new RdKafka\Producer($conf);
        static::$topic = static::$producer->newTopic($topic);

        return new KafkaProducer();
    }

    /**
     * @param $message
     * @param int $retries
     * @throws Exception
     */
    public function producer($message, $retries = 10)
    {
        try {
            static::$topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));

            static::$producer->poll(0);

            // 線上版本不支持flush方法
            if (!method_exists(static::$producer, 'flush')) {
                return;
            }

            $result = RD_KAFKA_RESP_ERR_NO_ERROR;

            for ($flushRetries = 0; $flushRetries < $retries; $flushRetries++) {
                $result = static::$producer->flush(10000);
                if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                    break;
                }
            }

            if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
                throw new \RuntimeException('Was unable to flush, messages might be lost!');
            }
        } catch (Exception $e) {
            $this->log(array('msg' => json_encode($message), 'err_msg' => $e->getMessage()));
            throw $e;
        }
    }

    public function log($message)
    {
        $message = PHP_EOL . "時間:" . date('Y-m-d H:i:s') . PHP_EOL . var_export($message, true) . PHP_EOL;
        error_log($message, 3, ROOT_DIR . '/log/KafkaError.log');
    }
}

2、消費者

<?php
<?php
require_once dirname(__FILE__) . '/../constant.php';

class KafkaConsumer
{
    /** @var RdKafka\ConsumerTopic */
    private static $topic;

    /** @var  RdKafka\KafkaConsumer */
    private static $consumer;

    /**
     * @param string $brokerList
     * @param string $groupId
     * @param string $topic
     * @param array $isSsl 公司集群每個topic都得申請認(rèn)證用戶和密碼
     * @return KafkaConsumer
     * @throws Exception
     */
    public static function init($brokerList, $groupId, $topic, $isSsl = null)
    {
        if (!extension_loaded('rdkafka')) {
            throw new Exception("no rdkafka extension", 1);
        }
        $conf = new \RdKafka\Conf();
        $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
            switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    $kafka->assign($partitions);
                    break;

                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                    $kafka->assign(NULL);
                    break;
                default:
                    throw new \Exception($err);
            }
        });
        if ($isSsl) {
            $conf->set('security.protocol', KAFKA_SECURITY_PROTOCOL);
            $conf->set('sasl.mechanism', KAFKA_SASL_MECHANISM);
            $conf->set('sasl.username', $isSsl['username']);
            $conf->set('sasl.password', $isSsl['password']);
        }
        $conf->setErrorCb(function ($kafka, $err, $reason) {
            throw new Exception('消費失斨薷摇:' . rd_kafka_err2str($err) . '漫玄。緣由:' . $reason);
        });
        $conf->set('group.id', $groupId);
        $conf->set('metadata.broker.list', $brokerList);
        $conf->set('topic.metadata.refresh.interval.ms', 60000);    // Topic metadata 刷新間隔,毫秒压彭。metadata 自動刷新錯誤和連接睦优。設(shè)置為 -1 關(guān)閉刷新間隔。
        $conf->set('socket.keepalive.enable', true);          // Broker sockets 允許 TCP 保持活力

        $conf->set('auto.commit.interval.ms', 100);
        $conf->set('auto.offset.reset', 'latest');//smallest

        static::$consumer = new \RdKafka\KafkaConsumer($conf);
        static::$consumer->subscribe([$topic]);

        return new KafkaConsumer();
    }

    /**
     * @param callable|null $callback
     * @throws Exception
     */
    public static function consumer($callback = null)
    {
        try {
            $message = static::$consumer->consume(120 * 1000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    if ($callback)
                        call_user_func($callback, $message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    throw new \Exception("No more messages; will wait for more\n");
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    throw new \Exception("Timed out\n");
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        } catch (Exception $e) {
            $message = PHP_EOL . "時間:" . date('Y-m-d H:i:s') . PHP_EOL . var_export($e->getMessage(), true) . PHP_EOL;
            error_log($message, 3, ROOT_DIR . '/log/KafkaError.log');
            throw $e;
        }
    }
}

3壮不、生產(chǎn)者調(diào)用

try {
    $brokerList = '127.0.0.1:9092';
    $producer = KafkaProducer::init($brokerList, 'test_cluster',
        array('username' => 'writer', 'password' => '123456'));
    for ($i = 0; $i < 10; $i++) {
        $producer->producer(array('bc' => 201700, 'numd' => $i), 2);
    }
} catch (Exception $e) {
    var_dump($e->getMessage());
}

4汗盘、消費者調(diào)用

try {
    $brokerList = '127.0.0.1:9092';
    $kc = KafkaConsumer::init(
        $brokerList,
        'test-consumer-group',
        'test_cluster',
        array('username' => 'reader', 'password' => '123456')
    );
    while (true) {
        $kc->consumer(function ($message) {
            var_dump($message);
        });
    }
} catch (Exception $e) {
    var_dump($e->getMessage());
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市忆畅,隨后出現(xiàn)的幾起案子衡未,更是在濱河造成了極大的恐慌,老刑警劉巖家凯,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件缓醋,死亡現(xiàn)場離奇詭異,居然都是意外死亡绊诲,警方通過查閱死者的電腦和手機送粱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來掂之,“玉大人抗俄,你說我怎么就攤上這事脆丁。” “怎么了动雹?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵槽卫,是天一觀的道長。 經(jīng)常有香客問我胰蝠,道長歼培,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任茸塞,我火速辦了婚禮躲庄,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘钾虐。我一直安慰自己噪窘,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布效扫。 她就那樣靜靜地躺著倔监,像睡著了一般。 火紅的嫁衣襯著肌膚如雪荡短。 梳的紋絲不亂的頭發(fā)上丐枉,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天,我揣著相機與錄音掘托,去河邊找鬼。 笑死籍嘹,一個胖子當(dāng)著我的面吹牛闪盔,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播辱士,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼泪掀,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了颂碘?” 一聲冷哼從身側(cè)響起异赫,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎头岔,沒想到半個月后塔拳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡峡竣,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年靠抑,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片适掰。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡颂碧,死狀恐怖荠列,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情载城,我是刑警寧澤肌似,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站诉瓦,受9級特大地震影響川队,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜垦搬,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一呼寸、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧猴贰,春花似錦对雪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至栅干,卻和暖如春迈套,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背碱鳞。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工桑李, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人窿给。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓贵白,卻偏偏與公主長得像,于是被迫代替她去往敵國和親崩泡。 傳聞我的和親對象是個殘疾皇子禁荒,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容