date: 2019-07-17 22:42:21
title: hyperf| 帶你一起看 hyperf 文檔之 amqp
hyperf 開源有一段時(shí)間了, 從開發(fā)者交流群就能感受到熱度. 這段時(shí)間下來, 有一個(gè)明顯的現(xiàn)象, 某A 提了一個(gè)技術(shù)問題, 某B 直接拋一個(gè)官方文檔的對應(yīng)鏈接. 這種現(xiàn)象實(shí)在太常見, 甚至衍生出了 歡迎進(jìn)入 vip 交流群 這樣的商機(jī). 不得不說:
花 2 個(gè)小時(shí)認(rèn)真看一遍文檔, 比遇到問題就卡住然后到處問要高效得多.
重要的事情說三遍:
- 認(rèn)真看一遍文檔
- 認(rèn)真看一遍文檔
- 認(rèn)真看一遍文檔
好了, 回到正題, 今天我們來玩 amqp.
項(xiàng)目準(zhǔn)備
這些都可以在文檔中找到, 所以直接上操作.
- 使用開發(fā)組提供的 docker
hold 不住 docker, 不用也行, 但是要能基于開發(fā)組的 Dockerfile 配置好環(huán)境, 如果既不會(huì)用 docker, 也無法自己配置好開發(fā)環(huán)境, 請一定要努力哦.
使用 docker-compose 配置的全部環(huán)境:
version: '3'
services:
ms:
image: hyperf/hyperf
volumes:
- ../:/data
ports:
- "9501:9501"
environment:
APP_ENV: dev
tty: true
mysql:
image: mysql:5.7.26
volumes:
- ./config/my.cnf:/etc/mysql/conf.d/my.cnf
- ./config/sql:/docker-entrypoint-initdb.d
- ./data/mysql:/var/lib/mysql
ports:
- "3306:3306"
environment:
TZ: Asia/Shanghai
MYSQL_ROOT_PASSWORD: root
redis:
image: redis:alpine
volumes:
- ./config/redis.conf:/etc/redis/redis.conf
- ./data/redis:/data
ports:
- "6379:6379"
rabbitmq:
image: rabbitmq:management-alpine
hostname: myrabbitmq
volumes:
- ./data/rabbitmq:/var/lib/rabbitmq/mnesia
ports:
- "5672:5672" # mq
- "15672:15672" # admin
這里多說一句, docker / Dockerfile / docker-compose 只是滿足開發(fā)環(huán)境的使用的話, 真的很簡單, 記住幾個(gè)常用的 docker 命令, 清楚 Dockerfile 幾個(gè)常用的指令(RUN CMD 等), docker-compose 只是 yaml 格式的配置文件而已.
推薦一個(gè)好習(xí)慣: 一個(gè)文檔專門記 docker / Dockerfile / docker-compose 的常用內(nèi)容, 使用過程中逐漸對這個(gè)文件進(jìn)行增刪查改(CRUD), 不用多久, 你就會(huì)發(fā)現(xiàn)自己用起 docker 來, 賊 6 !
另一個(gè)好習(xí)慣是 最佳實(shí)踐, 你要從無到有用起來很難, 但是跟著最佳實(shí)踐走, 就能又快又好 ! 當(dāng)然, 再上一層樓, 你也能成為最佳實(shí)踐.
- 安裝項(xiàng)目
composer create-project hyperf/hyperf-skeleton hyperf-demo
安裝過程選擇自己需要的組件, 不清楚就先不要選, 反正之后可以通過 composer require
安裝. 其實(shí)我更想說的是:
安裝的組件自己 hold 不住, 然后到處叫, 這樣多沒意思呀.
- 配置 composer.json
"repositories": {
"hyperf": {
"type": "path",
"url": "../hyperf/src/*"
},
"packagist": {
"type": "composer",
"url": "https://mirrors.aliyun.com/composer"
}
}
添加了 path, 從我本地加載 hyperf 組件, 方便開發(fā), 如果不參與 hyperf 組件開發(fā), 可以忽略這一步.
- 添加 hyperf/amqp
# 安裝
composer require hyperf/amqp
# 添加配置文件
php bin/hyperf.php vendor:publish hyperf/amqp
- 修改配置, 啟動(dòng)并驗(yàn)證
我啟動(dòng)了 mysql / redis / rabbitmq, 配置相關(guān)組件的配置, 并啟動(dòng)框架進(jìn)行驗(yàn)證
# config
vim .env
vim config/autoload/redis.php
vim config/autoload/database.php
vim config/autoload/amqp.php
# test
php bin/hyperf.php start
好了, 項(xiàng)目準(zhǔn)備好了, 正式開始擼代碼.
官方文檔 amqp demo
文檔有的, 還是直接貼:
# producer
php bin/hyperf.php gen:amqp-producer DemoProducer
# consumer
php bin/hyperf.php gen:amqp-consumer DemoConsumer
# 使用 command 盜用 DemoProducer 進(jìn)行驗(yàn)證
php bin/hyperf.php gen:command TestCommand
producer 發(fā)個(gè)消息:
- 設(shè)置 command 的名字:
parent::__construct('t');
- 使用
@Inject()
注解注入 - 發(fā)消息, 一行搞定:
$this->producer->produce(new DemoProducer('test'. date('Y-m-d H:i:s')));
<?php
declare(strict_types=1);
namespace App\Command;
use App\Amqp\Producer\DemoProducer;
use Hyperf\Amqp\Producer;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Hyperf\Di\Annotation\Inject;
use Psr\Container\ContainerInterface;
/**
* @Command
*/
class TestCommand extends HyperfCommand
{
/**
* @var ContainerInterface
*/
protected $container;
/**
* @Inject()
* @var Producer
*/
protected $producer;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
parent::__construct('t');
}
public function configure()
{
$this->setDescription('Hyperf Demo Command');
}
public function handle()
{
$this->producer->produce(new DemoProducer('test'. date('Y-m-d H:i:s')));
}
}
愉快的玩耍起來:
# produce
php bin/hyperf.php t
# consume
php bin/hyperf.php start # 會(huì)使用 swoole process 啟動(dòng) DemoConsumer
# 也可以訪問 rabbitmq admin 控制臺(tái)
http://localhost:15672
擼一擼 rabbitmq 官網(wǎng) tutorial
跟著 rabbitmq 官網(wǎng) tutorial, 見識一下 hyperf 中的 amqp 有多簡單
// consumer
/**
* @Consumer()
*/
class DemoConsumer extends ConsumerMessage
{
protected $exchange = 'hello';
protected $type = Type::FANOUT;
protected $queue = 'hello';
public function consume($data): string
{
var_dump($data);
return Result::ACK;
}
}
// producer
/**
* @Producer()
*/
class DemoProducer extends ProducerMessage
{
protected $exchange = 'hello';
protected $type = Type::FANOUT;
protected $routingKey = 'hello';
public function __construct($data)
{
$this->payload = $data;
}
}
設(shè)置一下 nums
參數(shù), 就可以多進(jìn)程.
// Consumer
/**
* @Consumer(nums=2)
*/
class DemoConsumer extends ConsumerMessage
{
protected $exchange = 'task';
protected $type = Type::FANOUT;
protected $queue = 'task';
public function consume($data): string
{
var_dump($data);
return Result::ACK;
}
}
// producer
/**
* @Producer()
*/
class DemoProducer extends ProducerMessage
{
protected $exchange = 'task';
protected $type = Type::FANOUT;
protected $routingKey = 'task';
public function __construct($data)
{
$this->payload = $data;
}
}
和上面的 hello world
一致
終于看到 routing_key
的作用了
// consumer
/**
* @Consumer()
*/
class DemoConsumer extends ConsumerMessage
{
protected $exchange = 'routing';
protected $type = Type::DIRECT;
// 這個(gè) consumer 只消費(fèi) error 級別的日志
protected $queue = 'routing.error';
protected $routingKey = 'error';
public function consume($data): string
{
var_dump($data);
return Result::ACK;
}
}
/**
* @Consumer()
*/
class Demo2Consumer extends ConsumerMessage
{
protected $exchange = 'routing';
protected $type = Type::DIRECT;
// 這個(gè) consumer 消費(fèi)所有級別的日志
protected $queue = 'routing.all';
protected $routingKey = [
'info',
'warning',
'error',
];
public function consume($data): string
{
var_dump($data);
return Result::ACK;
}
}
// producer
/**
* @Producer()
*/
class DemoProducer extends ProducerMessage
{
protected $exchange = 'routing';
protected $type = Type::DIRECT;
public function __construct($data, $routingKey)
{
$this->routingKey = $routingKey;
$this->payload = $data;
}
}
// produce
$this->producer->produce(new DemoProducer('info'. date('Y-m-d H:i:s'), 'info'));
$this->producer->produce(new DemoProducer('warning'. date('Y-m-d H:i:s'), 'warning'));
$this->producer->produce(new DemoProducer('error'. date('Y-m-d H:i:s'), 'error'));
var_dump('done');
和的, 和上面的 routing 差不多
// consume
/**
* @Consumer()
*/
class DemoConsumer extends ConsumerMessage
{
protected $exchange = 'topics';
protected $type = Type::TOPIC;
protected $queue = 'topics.t1';
// protected $routingKey = '#'; // all
// protected $routingKey = 'kern.*';
// protected $routingKey = '*.critical';
// protected $routingKey = 'kern.critical';
protected $routingKey = [
'kern.*',
'*.critical',
];
public function consume($data): string
{
var_dump($data);
return Result::ACK;
}
}
// produce
/**
* @Producer()
*/
class DemoProducer extends ProducerMessage
{
protected $exchange = 'topics';
protected $type = Type::TOPIC;
public function __construct($data, $routingKey)
{
$this->routingKey = $routingKey;
$this->payload = $data;
}
}
可以看到, 想要用 amqp, 自動(dòng)生成好代碼后, 改一改屬性就成, so easy~
再聊 amqp
amqp 難不難用? 至少基礎(chǔ)的使用還是很好掌握的, 下面有一張圖可供參考
把 producer consumer connection vhost channel exchange queue routing_key bind publish consume msg
幾個(gè)概念了解了, 基礎(chǔ)使用就能很順手. 而在 hyperf 中, 得益于對一些常用使用的方式的封裝, 自動(dòng)生成代碼 + 改改類屬性 就能把 amqp 用起來 !
寫在最后
無他, 唯手熟爾.