模擬MQTT發(fā)布客戶端
上代碼:
1.pubClient
package com.MyMQTTServerAndClient.publishClient;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import org.eclipse.moquette.parser.netty.MQTTDecoder;
import org.eclipse.moquette.parser.netty.MQTTEncoder;
public class pubClient {
public static void main(String[] args) {
pubClient.startMqttClient();
}
public static void startMqttClient(){
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("log", new LoggingHandler());
pipeline.addLast("decoder", new MQTTDecoder());//解碼
pipeline.addLast("encoder", new MQTTEncoder());//編碼
pipeline.addLast("handler", new PubMQTTClientHandler());
}
});
ChannelFuture ch = b.connect("127.0.0.1", 7777).sync();
ch.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
group.shutdownGracefully();
}
}
}
2.PubMQTTClientHandler
package com.MyMQTTServerAndClient.publishClient;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import org.eclipse.moquette.proto.messages.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* 暫時只支持Qos【0,1】
* 備注:qos=0情況下服務端接收到數(shù)據(jù)后MessageID會被設置為null【應該是編解碼的時候被設置】
* 該類負責發(fā)布話題
*/
public class PubMQTTClientHandler extends SimpleChannelInboundHandler<AbstractMessage> {
private static Logger logger = LoggerFactory.getLogger(PubMQTTClientHandler.class);
private String userName="admin";
private String passWord="password";
ScheduledFuture<?> scheduledFuture;//取消客戶端心跳用
private static AtomicBoolean firstPingResp=new AtomicBoolean(true);//第一次ping通
//Qos=0的待發(fā)布的話題
LinkedBlockingDeque<PublishMessage> zeroQosPubQueue=new LinkedBlockingDeque<>();
//Qos=1的待發(fā)布的話題
LinkedBlockingDeque<PublishMessage> oneQosPubQueue=new LinkedBlockingDeque<>();
//Qos=1的發(fā)布話題的ack
LinkedBlockingDeque<PubAckMessage> oneQosPubAckQueue=new LinkedBlockingDeque<>();
//已經(jīng)發(fā)送的topic key為messageId value為該PublishMessage
ConcurrentHashMap<Integer,PublishMessage> alreadySendMap=new ConcurrentHashMap<>();
ExecutorService executorService;
CountDownLatch countDownLatch=new CountDownLatch(1);
List<java.util.concurrent.Future> FutureList=new ArrayList<>();
private Channel channel;
private Boolean running=new Boolean(true);
public PubMQTTClientHandler() {
executorService= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
//發(fā)布Qos=0的話題
java.util.concurrent.Future<?> future1 = executorService.submit(() -> {
while(running){
while (!zeroQosPubQueue.isEmpty()) {
PublishMessage publishMessage = zeroQosPubQueue.removeFirst();
channel.writeAndFlush(publishMessage);
logger.info("publishMessage {} publish success {}", publishMessage.getTopicName(),publishMessage.getMessageID());
}
try {
//防止執(zhí)行過快
countDownLatch.await(1000, MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//發(fā)布Qos=1的話題
java.util.concurrent.Future<?> future2 = executorService.submit(() -> {
while(running){
while (!oneQosPubQueue.isEmpty()) {
PublishMessage publishMessage = oneQosPubQueue.removeFirst();
Integer messageID = publishMessage.getMessageID();
//還沒發(fā)送過該topic消息,需要加入map
if (!alreadySendMap.containsKey(messageID)) {
alreadySendMap.put(messageID, publishMessage);
}
channel.writeAndFlush(publishMessage);
logger.info("publishMessage {} publish success {}", publishMessage.getTopicName(),publishMessage.getMessageID());
oneQosPubQueue.addLast(publishMessage);
try {
//防止發(fā)送過快
SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
//接受Qos=1的話題的ack
java.util.concurrent.Future<?> future3 = executorService.submit(() -> {
while(running){
while (!oneQosPubAckQueue.isEmpty()) {
PubAckMessage pubAckMessage = oneQosPubAckQueue.removeFirst();
if (alreadySendMap.containsKey(pubAckMessage.getMessageID())) {
//topic收到ack贰健,需要從發(fā)送隊列移除該topic
oneQosPubQueue.remove(alreadySendMap.get(pubAckMessage.getMessageID()));
alreadySendMap.remove(pubAckMessage.getMessageID());
} else {
logger.error("receive error PubAckMessage {}",
pubAckMessage.getMessageID());
}
}
try {
//防止運行過快
SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
FutureList.add(future1);
FutureList.add(future2);
FutureList.add(future3);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("error message {}",cause);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("client "+ctx.channel().remoteAddress()+" active");
//連接
ConnectMessage connectMessage = new ConnectMessage();
connectMessage.setProcotolVersion((byte) 3);
connectMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
connectMessage.setCleanSession(true);
connectMessage.setClientID(String.valueOf(UUID.randomUUID()));
connectMessage.setKeepAlive(60);
connectMessage.setWillFlag(false);
connectMessage.setUserFlag(true);
connectMessage.setUsername(this.userName);
connectMessage.setPasswordFlag(true);
connectMessage.setPassword(this.passWord);
ctx.writeAndFlush(connectMessage,ctx.newPromise().addListener((future)->{
if(future.cause()==null){
logger.info("send connectMessage success");
}else{
logger.error("send connectMessage error "+future.cause());
}
}));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.error("channelInactive " +ctx.channel().remoteAddress());
boolean cancel = scheduledFuture.cancel(true);
logger.info("{} client canal heart beat {}",ctx.channel().localAddress(),cancel);
ctx.close(ctx.newPromise().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
logger.info("channel {} close ",ctx.channel().localAddress());
}
}));
if(this.channel!=null){
channel=null;
}
//
running=false;
//
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, AbstractMessage msg) throws Exception {
if(msg instanceof AbstractMessage){
byte messageType = msg.getMessageType();
switch(messageType){
case AbstractMessage.CONNACK:
processConnectAckResp(ctx,msg);
break;
case AbstractMessage.PINGRESP:
processPingResp();
break;
case AbstractMessage.PUBACK:
processPubAck(ctx,msg);
break;
default:
logger.info("message type "+((AbstractMessage) msg).getMessageType());
break;
}
}
}
/**
* 發(fā)布的topic Qos=1時會收到pubAck
* @param ctx
* @param msg
*/
private void processPubAck(ChannelHandlerContext ctx, AbstractMessage msg) {
PubAckMessage pubAckMessage = (PubAckMessage) msg;
boolean result = oneQosPubAckQueue.offerLast(pubAckMessage);
if(result==true){
logger.info("receive PubAckMessage {} from MQTTServer,and insert queue success",pubAckMessage.getMessageID());
}else{
logger.error("failed insert PubAckMessage {} into queue",pubAckMessage.getMessageID());
}
}
//第一次接收到服務端pingResp后才會發(fā)布主題
private void processPingResp() {
if(firstPingResp.get()){
// IntStream.rangeClosed(5,200).forEach((i)->{
//// publishTopic("mqttClient/topicOne"+i,AbstractMessage.QOSType.LEAST_ONE);
//// });
publishTopic("mqttClient/topicOne1",AbstractMessage.QOSType.LEAST_ONE);
publishTopic("mqttClient/topicOne2",AbstractMessage.QOSType.LEAST_ONE);
publishTopic("mqttClient/topicTwo2",AbstractMessage.QOSType.LEAST_ONE);
publishTopic("mqttClient/topicTwo3",AbstractMessage.QOSType.MOST_ONE);
for(int i=0;i<5;i++){
publishTopic("mqttClient/topicTwo4",AbstractMessage.QOSType.MOST_ONE);
}
logger.debug("first receive ping response , start publish topic" );
firstPingResp.set(false);
}else{
logger.debug("ping response");
}
}
/**
* 發(fā)布主題【Qos目前只支持0络断、1】
* @param topic
* @param qos
*/
private void publishTopic(String topic,AbstractMessage.QOSType qos) {
//這個字段需要設置,否者無法發(fā)送消息
PublishMessage publishMessage = new PublishMessage();
publishMessage.setTopicName(topic);
publishMessage.setQos(qos);
int random = new Random().nextInt(10000);//<2^32-1
/**
* qos=0情況下服務端接收到數(shù)據(jù)后MessageID會被設置為null【應該是編解碼的時候被設置】
*/
publishMessage.setMessageID(random);
publishMessage.setRetainFlag(false);
publishMessage.setPayload(ByteBuffer.wrap("I am payload".getBytes()));
if(qos== AbstractMessage.QOSType.MOST_ONE){
boolean offer = zeroQosPubQueue.offer(publishMessage);
if(offer){
logger.info("publishMessage {} insert zeroQosPubQueue success ",publishMessage.getTopicName());
}
}else if(qos== AbstractMessage.QOSType.LEAST_ONE){
boolean offer = oneQosPubQueue.offer(publishMessage);
if(offer){
logger.info("publishMessage {} insert oneQosPubQueue success ",publishMessage.getTopicName());
}
}
}
private void processConnectAckResp(ChannelHandlerContext ctx, Object msg) {
ConnAckMessage ackMessage = (ConnAckMessage) msg;
if(ackMessage.getReturnCode()==(byte)0){
channel=ctx.channel();
logger.info("get connectAck success, start heart beat" );
//存
// ctx.channel().attr(MyConstants.pingKey).setIfAbsent("firstPing");
// 定時心跳任務
scheduledFuture = ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
PingReqMessage pingReqMessage = new PingReqMessage();
pingReqMessage.setQos(AbstractMessage.QOSType.MOST_ONE);//心跳暫時MOST_ONE(0)
ctx.writeAndFlush(pingReqMessage, ctx.newPromise().addListener((future) -> {
if (future.cause() == null) {
logger.info("send heartBeat success");
} else {
logger.error("send heartBeat error");
}
}));
}, 2, 55, SECONDS);
}else if(ackMessage.getReturnCode()==(byte)4){
logger.error("get connectAck failed, cannot heart beat,username or password" );//
ctx.close();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
}