本文僅展示核心代碼濒憋,全部代碼,請移步:git-soomq
1陶夜,服務(wù)端
服務(wù)端的設(shè)計就非常簡單了凛驮,最核心的就是消息的存取,以及響應(yīng)生產(chǎn)者和消費者的網(wǎng)絡(luò)請求
分為2部分:
1.1 消息文件
消息的存儲我們參考kafka条辟,并簡化其邏輯黔夭,因為是最簡單的mq,我們只考慮單機的情況的就行捂贿,每個topic存儲2個文件
topicname.index
topicname.data
.index 文件存儲格式為:
消息順序號:消息截止位置
.data 文件按照順序存儲具體的消息
文件操作:
package com.esoo.mq.server.message;
import com.alibaba.fastjson.JSON;
import com.esoo.mq.common.ProcessorCommand;
import java.io.RandomAccessFile;
/**
* 為每個topic創(chuàng)建一個對象進行管理
*/
public class MessageFile {
private String topic;
private Long offset;
//索引文件
private RandomAccessFile indexFile = null ;
//數(shù)據(jù)文件
private RandomAccessFile dataFile = null ;
//追加消息(生產(chǎn)者進行調(diào)用)
public ProcessorCommand appendMsg(ProcessorCommand in){
try {
//加鎖纠修,避免競爭胳嘲,文件亂碼
synchronized (in.getResult().getTopic()) {
//讀取index文件最后一行
String lastLine = readLastLine(indexFile, null);
int lastOffset = 1;
//消息體追加到data文件中厂僧,并返回文件末尾位置,作為本條消息的offset
long lastindex = writeEndLine(dataFile, in.getResult().getBody());
if (lastLine != null && !lastLine.equals("")) {
String index[] = lastLine.split(":");
lastOffset = Integer.valueOf(index[0]);
lastOffset = lastOffset + 1;
}
//組裝本條消息index 序列號:消息體末尾位置
String insertMsgIndex = lastOffset + ":" + lastindex + "\t\n";
writeEndLine(indexFile, insertMsgIndex.getBytes());
in.setSuccess(true);
}
}catch (Exception e){
e.printStackTrace();
in.setSuccess(false);
in.setExmsg(e.getMessage());
}
return in;
}
//讀取消息了牛,消費者進行調(diào)用
public ProcessorCommand readMsg(ProcessorCommand in){
try {
synchronized (in.getResult().getTopic()) {
// 消息定位位置
int seekIn = 0;
// 消息體大小
int bodySize = 0;
//先定位到開始
indexFile.seek(0);
String indesMap=null;
//遍歷index文件颜屠,找到上一個消息 offset 與本消息offset 進行相減就是消息體大小
while ((indesMap = indexFile.readLine())!=null){
String index[] = indesMap.split(":");
int inNum = Integer.valueOf(String.valueOf(index[0]).trim());
int off = Integer.valueOf(String.valueOf(index[1]).trim());
if (inNum == in.getResult().getOffset()) {
seekIn = off;
}
if (inNum == (in.getResult().getOffset() + 1)) {
bodySize = off - seekIn;
}
}
if (bodySize == 0) {
in.setSuccess(false);
in.setExmsg("offset is end");
return in;
}
//定位到具體位置
dataFile.seek(seekIn);
//進行消息讀取
byte[] b = new byte[bodySize];
dataFile.read(b);
in.getResult().setBody(b);
in.setSuccess(true);
System.out.println(" READ MSG IS: "+JSON.toJSONString(in));
}
}catch (Exception e){
e.printStackTrace();
in.setSuccess(false);
in.setExmsg(e.getMessage());
}
return in;
}
//寫消息到最后一行
public static long writeEndLine(RandomAccessFile file, byte[] msg)
throws Exception {
// 文件長度,字節(jié)數(shù)
long fileLength = file.length();
// 將寫文件指針移到文件尾鹰祸。
file.seek(fileLength);
file.write(msg);
return file.getFilePointer();
}
//讀取最后一行的消息
public static String readLastLine(RandomAccessFile file, String charset) throws Exception {
long len = file.length();
if (len == 0L) {
return "";
} else {
long pos = len - 1;
while (pos > 0) {
pos--;
file.seek(pos);
if (file.readByte() == '\n') {
break;
}
}
if (pos == 0) {
file.seek(0);
}
byte[] bytes = new byte[(int) (len - pos)];
file.read(bytes);
if (charset == null) {
return new String(bytes);
} else {
return new String(bytes, charset);
}
}
}
public static String readByOffset(RandomAccessFile file, String charset) throws Exception {
return null;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Long getOffset() {
return offset;
}
public void setOffset(Long offset) {
this.offset = offset;
}
public RandomAccessFile getIndexFile() {
return indexFile;
}
public void setIndexFile(RandomAccessFile indexFile) {
this.indexFile = indexFile;
}
public RandomAccessFile getDataFile() {
return dataFile;
}
public void setDataFile(RandomAccessFile dataFile) {
this.dataFile = dataFile;
}
}
1.2 網(wǎng)絡(luò)編程
利用netty 開放端口甫窟,響應(yīng)生產(chǎn)者與消費者,每個消息包裝成一個commod蛙婴,commod類型
- 消息類型(消費/生產(chǎn))
- 消息topic
- 消息體(生產(chǎn)時用)
- 消息順序號(消費時用)
- 處理結(jié)果(成功/失敶志)
- 處理消息(失敗時添加原因)
網(wǎng)絡(luò)啟動
package com.esoo.mq.server;
import com.esoo.mq.server.netty.handler.NettySooMqServerHandler;
import com.esoo.mq.server.netty.handler.NettySooMqServerOutHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class SooMQServer {
private static Integer serverPort=9870;
ServerBootstrap b = new ServerBootstrap();
public void start(){
//創(chuàng)建reactor 線程組
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//1 設(shè)置reactor 線程組
b.group(bossLoopGroup, workerLoopGroup);
//2 設(shè)置nio類型的channel
b.channel(NioServerSocketChannel.class);
//3 設(shè)置監(jiān)聽端口
b.localAddress(serverPort);
//4 設(shè)置通道的參數(shù)
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 裝配子通道流水線
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有連接到達(dá)時會創(chuàng)建一個channel
protected void initChannel(SocketChannel ch) throws Exception {
// pipeline管理子通道channel中的Handler
// 向子channel流水線添加一個handler處理器
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettySooMqServerOutHandler());
ch.pipeline().addLast(new NettySooMqServerHandler());
}
});
// 6 開始綁定server
// 通過調(diào)用sync同步方法阻塞直到綁定成功
ChannelFuture channelFuture = b.bind().sync();
System.out.println(" 服務(wù)器啟動成功,監(jiān)聽端口: " +
channelFuture.channel().localAddress());
// 7 等待通道關(guān)閉的異步任務(wù)結(jié)束
// 服務(wù)監(jiān)聽通道會一直等待通道關(guān)閉的異步任務(wù)結(jié)束
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 8 優(yōu)雅關(guān)閉EventLoopGroup,
// 釋放掉所有資源包括創(chuàng)建的線程
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
}
}
網(wǎng)絡(luò)邏輯分發(fā)
注意:回寫給客戶端的消息體類型必須與入?yún)⒈3忠恢陆匠模駝tnetty無法解析
netty
package com.esoo.mq.server.netty.handler;
import com.alibaba.fastjson.JSON;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.processor.Processor;
import com.esoo.mq.server.processor.ProcessorFactory;
import io.netty.channel.*;
@ChannelHandler.Sharable
public class NettySooMqServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ProcessorCommand command = (ProcessorCommand) msg;
System.out.println("["+ctx.channel().remoteAddress()+"] msg:"+JSON.toJSONString(msg));
Processor processor = ProcessorFactory.getProcessorInstantiate(command.getType());
msg = processor.handle(command);
ChannelFuture f = ctx.writeAndFlush(msg);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("msg ctx send");
}
});
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
}
生產(chǎn)者
package com.esoo.mq.server.processor;
import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.message.MessageFile;
import com.esoo.mq.server.message.MessageFileFactory;
public class SendMessageProcessor implements Processor<Message,Message> {
@Override
public ProcessorCommand handle(ProcessorCommand task) {
MessageFile file = MessageFileFactory.getTopicFile(task.getResult().getTopic());
task = file.appendMsg(task);
return task;
}
}
消費者
package com.esoo.mq.server.processor;
import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.message.MessageFile;
import com.esoo.mq.server.message.MessageFileFactory;
public class ReadMessageProcessor implements Processor<Message,Message> {
@Override
public ProcessorCommand handle(ProcessorCommand task) {
Message msg = task.getResult();
MessageFile file = MessageFileFactory.getTopicFile(msg.getTopic());
task = file.readMsg(task);
return task;
}
}