簡介
我們知道proxy protocol是haproxy提出的一個(gè)代理協(xié)議酌心,通過這個(gè)協(xié)議,所有實(shí)現(xiàn)這個(gè)協(xié)議的proxy或者LBS,都可以附帶真實(shí)客戶端的IP地址和端口號(hào)友存,這使得proxy protocol在實(shí)際應(yīng)用中非常有用舒萎。
這么優(yōu)秀的協(xié)議当纱,沒有理由netty不支持畏铆。本文將會(huì)談一下netty中對proxy protoco代理協(xié)議的支持杂靶。
netty對proxy protocol協(xié)議的支持
proxy protocol協(xié)議其實(shí)很簡單,就是在請求前面帶了proxy header信息备韧。
在netty中這個(gè)header信息叫做HAProxyMessage:
public final class HAProxyMessage extends AbstractReferenceCounted {
HAProxyMessage是一個(gè)ReferenceCounted,這一點(diǎn)和ByteBuf很類似痪枫,說明HAProxyMessage保留著和ByteBuf很類似的特性织堂。
根據(jù)proxy protocol協(xié)議,該協(xié)議可以分為兩個(gè)版本奶陈,分別是v1和v2,其中v1版本是文本協(xié)議易阳,而v2版本支持二進(jìn)制的格式。
顯然從代碼編寫和調(diào)試的角度來看v1更加友好吃粒,但是從程序的角度來看,v2可能性能更高潦俺。
HAProxyMessage中有個(gè)專門的HAProxyProtocolVersion類,來表示proxy protocol的版本信息:
public enum HAProxyProtocolVersion {
V1(VERSION_ONE_BYTE),
V2(VERSION_TWO_BYTE);
HAProxyProtocolVersion是一個(gè)枚舉類,在它里面定義了和proxy協(xié)議相對應(yīng)的兩個(gè)版本號(hào)事示。
在版本號(hào)之后是command早像,在netty中用HAProxyCommand來表示:
public enum HAProxyCommand {
LOCAL(HAProxyConstants.COMMAND_LOCAL_BYTE),
PROXY(HAProxyConstants.COMMAND_PROXY_BYTE);
HAProxyCommand也是一個(gè)枚舉類,里面定義了兩個(gè)command的值肖爵,分別是local和proxy卢鹦。
其中l(wèi)ocal表示該請求是代理服務(wù)器主動(dòng)發(fā)起的,而不是客戶端發(fā)起的劝堪,比如監(jiān)控檢測等請求冀自。
proxy表示該請求是一個(gè)代理請求。
接下來是AddressFamily和TransportProtocol,這兩個(gè)字段用同一個(gè)byte來表示秒啦,所以這兩個(gè)類都是HAProxyProxiedProtocol的內(nèi)部類熬粗。
先看下AddressFamily的定義:
public enum AddressFamily {
AF_UNSPEC(AF_UNSPEC_BYTE),
AF_IPv4(AF_IPV4_BYTE),
AF_IPv6(AF_IPV6_BYTE),
AF_UNIX(AF_UNIX_BYTE);
AddressFamily中定義了4個(gè)address family類型,分別是unspec,ipv4,ipv6和unix余境。分別對應(yīng)未知family,ipv4,ipv6和unix domain socket驯嘱。
再看下TransportProtocol的定義:
public enum TransportProtocol {
UNSPEC(TRANSPORT_UNSPEC_BYTE),
STREAM(TRANSPORT_STREAM_BYTE),
DGRAM(TRANSPORT_DGRAM_BYTE);
TransportProtocol有3個(gè)值,分別是unspec,stream和dgram姆蘸。分別對應(yīng)未知協(xié)議祝蝠,http/https協(xié)議,udp/tcp協(xié)議绣张。
因?yàn)锳ddressFamily和TransportProtocol實(shí)際上是同一個(gè)byte答渔,所以經(jīng)過組合之后可以得到下面的幾個(gè)枚舉值:
UNKNOWN(TPAF_UNKNOWN_BYTE, AddressFamily.AF_UNSPEC, TransportProtocol.UNSPEC),
TCP4(TPAF_TCP4_BYTE, AddressFamily.AF_IPv4, TransportProtocol.STREAM),
TCP6(TPAF_TCP6_BYTE, AddressFamily.AF_IPv6, TransportProtocol.STREAM),
UDP4(TPAF_UDP4_BYTE, AddressFamily.AF_IPv4, TransportProtocol.DGRAM),
UDP6(TPAF_UDP6_BYTE, AddressFamily.AF_IPv6, TransportProtocol.DGRAM),
UNIX_STREAM(TPAF_UNIX_STREAM_BYTE, AddressFamily.AF_UNIX, TransportProtocol.STREAM),
UNIX_DGRAM(TPAF_UNIX_DGRAM_BYTE, AddressFamily.AF_UNIX, TransportProtocol.DGRAM);
以上的枚舉值也是HAProxyProxiedProtocol中定義的值。
接下就是源ip地址侥涵,目標(biāo)地ip地址沼撕,源端口和目標(biāo)端口這幾個(gè)值,定義為屬性表示如下:
private final String sourceAddress;
private final String destinationAddress;
private final int sourcePort;
private final int destinationPort;
最后芜飘,proxy protocol中還可以包含額外的字段tlv务豺,tlv在netty中也是一種byteBuf,使用HAProxyTLV表示:
public class HAProxyTLV extends DefaultByteBufHolder
因?yàn)閠lv是key value結(jié)構(gòu)嗦明,所以看下HAProxyTLV的構(gòu)造函數(shù):
public HAProxyTLV(Type type, ByteBuf content) {
this(type, Type.byteValueForType(type), content);
}
HAProxyTLV接受一個(gè)type和byteBuf的value笼沥。
Type是一個(gè)枚舉類,在netty中可以支持下面的值:
public enum Type {
PP2_TYPE_ALPN,
PP2_TYPE_AUTHORITY,
PP2_TYPE_SSL,
PP2_TYPE_SSL_VERSION,
PP2_TYPE_SSL_CN,
PP2_TYPE_NETNS,
OTHER;
在HAProxyMessage中娶牌,tlv是一個(gè)list來保存的:
private final List<HAProxyTLV> tlvs;
到此奔浅,所有HAProxyMessage所需要的參數(shù)都齊了,我們看下HAProxyMessage的構(gòu)造函數(shù):
public HAProxyMessage(
HAProxyProtocolVersion protocolVersion, HAProxyCommand command, HAProxyProxiedProtocol proxiedProtocol,
String sourceAddress, String destinationAddress, int sourcePort, int destinationPort,
List<? extends HAProxyTLV> tlvs)
HAProxyMessage會(huì)將所有的參數(shù)都存儲(chǔ)到本地的變量中诗良,供后續(xù)使用汹桦。
因?yàn)閜roxy protocol有兩個(gè)版本,v1和v2鉴裹,所以HAProxyMessage中提供了兩個(gè)將header編碼為AProxyMessage對象的方法舞骆,分別是:
static HAProxyMessage decodeHeader(ByteBuf header)
和:
static HAProxyMessage decodeHeader(String header)
有了proxy protocol的java表示之后钥弯,我們再來看一下HAProxyMessage的編碼解碼器。
HAProxyMessage的編碼解碼器
netty對HAProxyMessage對象的支持表現(xiàn)在兩個(gè)地方督禽,netty提供了兩個(gè)類分別對HAProxyMessage進(jìn)行編碼和解碼脆霎,這兩個(gè)類是HAProxyMessageEncoder和HAProxyMessageDecoder。
先看一下HAProxyMessageEncoder:
public final class HAProxyMessageEncoder extends MessageToByteEncoder<HAProxyMessage>
HAProxyMessageEncoder繼承自MessageToByteEncoder赂蠢,傳入的泛型是HAProxyMessage绪穆,表示是將HAProxyMessage編碼成為ByteBuf。
它的encode方法很簡單虱岂,根據(jù)HAProxyMessage傳入的message版本信息玖院,分別進(jìn)行編碼:
protected void encode(ChannelHandlerContext ctx, HAProxyMessage msg, ByteBuf out) throws Exception {
switch (msg.protocolVersion()) {
case V1:
encodeV1(msg, out);
break;
case V2:
encodeV2(msg, out);
break;
default:
throw new HAProxyProtocolException("Unsupported version: " + msg.protocolVersion());
}
}
HAProxyMessageDecoder是跟HAProxyMessageEncoder相反的動(dòng)作,是將接收到的ByteBuf解析成為HAProxyMessage:
public class HAProxyMessageDecoder extends ByteToMessageDecoder
因?yàn)镠AProxyMessage有兩個(gè)版本第岖,那么怎么判斷接收到的ByeBuf是哪個(gè)版本呢难菌?
其實(shí)很簡單,因?yàn)関1版本和v2版本的開始字符是不一樣的蔑滓,v1版本的開頭是一個(gè)text:"PROXY", v2版本的開頭是一個(gè)固定的二進(jìn)制串,如下所示:
static final byte[] BINARY_PREFIX = {
(byte) 0x0D,
(byte) 0x0A,
(byte) 0x0D,
(byte) 0x0A,
(byte) 0x00,
(byte) 0x0D,
(byte) 0x0A,
(byte) 0x51,
(byte) 0x55,
(byte) 0x49,
(byte) 0x54,
(byte) 0x0A
};
static final byte[] TEXT_PREFIX = {
(byte) 'P',
(byte) 'R',
(byte) 'O',
(byte) 'X',
(byte) 'Y',
};
看下它的decode方法實(shí)現(xiàn):
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (version == -1) {
if ((version = findVersion(in)) == -1) {
return;
}
}
ByteBuf decoded;
if (version == 1) {
decoded = decodeLine(ctx, in);
} else {
decoded = decodeStruct(ctx, in);
}
if (decoded != null) {
finished = true;
try {
if (version == 1) {
out.add(HAProxyMessage.decodeHeader(decoded.toString(CharsetUtil.US_ASCII)));
} else {
out.add(HAProxyMessage.decodeHeader(decoded));
}
} catch (HAProxyProtocolException e) {
fail(ctx, null, e);
}
}
}
上面代碼的邏輯是先從ByteBuf中根據(jù)版本號(hào)decode出header信息放到ByteBuf中郊酒。
然后再根據(jù)版本號(hào)的不同,分別調(diào)用HAProxyMessage的兩個(gè)不同版本的decodeHeader方法進(jìn)行解碼键袱。最終得到HAProxyMessage燎窘。
netty中proxy protocol的代碼示例
有了netty對proxy protocol的支持,那么在netty中搭建支持proxy protocol的服務(wù)器和客戶端就很容易了蹄咖。
先看一下如何搭建支持proxy protocol的服務(wù)器:
private static void startServer(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ServerInitializer());
b.bind(port).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
代碼和常規(guī)的netty server一樣褐健,這里使用了NioEventLoopGroup和NioServerSocketChannel,搭建了一個(gè)支持TCP協(xié)議的netty服務(wù)器。
ServerInitializer中包含了netty自帶的HAProxy編碼器和自定義的消息處理器:
class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.DEBUG),
new HAProxyMessageDecoder(),
new SimpleChannelInboundHandler() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HAProxyMessage) {
log.info("proxy message is : {}", msg);
} else if (msg instanceof ByteBuf) {
log.info("bytebuf message is : {}", ByteBufUtil.prettyHexDump((ByteBuf) msg));
}
}
});
}
}
這里使用netty自帶的HAProxyMessageDecoder澜汤,用來將ByteBuf消息解碼為HAProxyMessage蚜迅,然后在自定義的SimpleChannelInboundHandler中對HAProxyMessage進(jìn)行處理。
這里的服務(wù)器可以處理兩種消息俊抵,一種是HAProxyMessage谁不,一種是原始的ByteBuf。處理的結(jié)果就是將消息打印出來徽诲。
然后看下客戶端的定義:
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ClientHander());
Channel ch = b.connect(host, port).sync().channel();
客戶端使用的是EventLoopGroup和NioSocketChannel刹帕,是基于TCP協(xié)議的請求。
這里添加了自定義的handler:ClientHander,ClientHander繼承自ChannelOutboundHandlerAdapter用來對client發(fā)出的消息進(jìn)行處理谎替。
這里看一下它的handlerAdded方法:
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().addBefore(ctx.name(), null, HAProxyMessageEncoder.INSTANCE);
super.handlerAdded(ctx);
}
可以看到handlerAdded方法向channelPipeline中添加了HAProxyMessageEncoder轩拨,用于編碼HAProxyMessage。
因?yàn)閷τ谝粋€(gè)connection來說院喜,HAProxyMessage只需要用到一次,后續(xù)的正常消息就不需要這個(gè)編碼器了晕翠,所以我們需要在write方法中監(jiān)聽HAProxyMessage的狀態(tài)喷舀,如果寫入成功之后砍濒,就從pipeline中移出HAProxyMessageEncoder和ClientHander。
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ChannelFuture future1 = ctx.write(msg, promise);
if (msg instanceof HAProxyMessage) {
future1.addListener((ChannelFutureListener) future2 -> {
if (future2.isSuccess()) {
ctx.pipeline().remove(HAProxyMessageEncoder.INSTANCE);
ctx.pipeline().remove(ClientHander.this);
} else {
ctx.close();
}
});
}
}
最后我們構(gòu)建了一個(gè)虛擬的HAProxyMessage硫麻,然后通過netty客戶端進(jìn)行發(fā)送:
HAProxyMessage message = new HAProxyMessage(
HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, HAProxyProxiedProtocol.TCP4,
"127.0.0.1", "127.0.0.2", 8000, 9000);
ch.writeAndFlush(message).sync();
ch.writeAndFlush(Unpooled.copiedBuffer("this is a proxy protocol message!", CharsetUtil.UTF_8)).sync();
ch.close().sync();
總結(jié)
上面的代碼只是一個(gè)簡單的模擬proxy protocol在netty中的使用情況爸邢,并不代表上面的代碼就可以在實(shí)際的項(xiàng)目中應(yīng)用了。如果你想使用的話拿愧,可以在下面的代碼上面繼續(xù)豐富和完善杠河。
本文的代碼,大家可以參考: