前言
最近在用netty開(kāi)發(fā)項(xiàng)目慷嗜,主要用于tcp通信穷缤,處理邏輯時(shí)發(fā)現(xiàn)沒(méi)有很好的同步獲取response的機(jī)制为迈,研究了一下現(xiàn)在主流的方式三椿,就自定義實(shí)現(xiàn)了一下。
SyncHttpResponse
這里以http服務(wù)為例葫辐,tcp的協(xié)議可以類似使用搜锰,完整的代碼還是放在Github上了。
- 定義ClientHandler耿战,繼承了ChannelInboundHandlerAdapter蛋叼,并保存了ChannelHandlerContext來(lái)實(shí)現(xiàn)發(fā)送請(qǐng)求的功能。
同步主要是用到了ChannelPromise剂陡,發(fā)送完請(qǐng)求后狈涮,會(huì)新生成一個(gè)ChannelPromise并返回租冠,并在接收到完整的response后setSuccess。
public static class ClientHandler extends ChannelInboundHandlerAdapter {
private ChannelHandlerContext ctx;
private ChannelPromise promise;
private String data;
private long readByte;
private long contentLength;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.ctx = ctx;
}
public ChannelPromise sendMessage(Object message) {
if (ctx == null)
throw new IllegalStateException();
promise = ctx.writeAndFlush(message).channel().newPromise();
return promise;
}
public String getData() {
return data;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
contentLength = Long.parseLong(response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
readByte = 0;
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
ByteBuf buf = content.content();
readByte += buf.readableBytes();
data += buf.toString(Charset.forName("gb2312"));
if (readByte >= contentLength) {
promise.setSuccess();
}
buf.release();
}
}
}
- client薯嗤,這里開(kāi)啟一個(gè)Bootstrap顽爹,內(nèi)置了HttpRequestEncoder和HttpResponseDecoder來(lái)實(shí)現(xiàn)http的decode和encode。connect方法會(huì)鏈接到指定的host骆姐,這里會(huì)有一個(gè)死循環(huán)镜粤,直到鏈接激活為止。getBody方法會(huì)在發(fā)送HttpRequest后await到ChannelPromise的完成玻褪,然后取出讀到的數(shù)據(jù)肉渴。
public static class HttpClient {
private ClientHandler clientHandler = new ClientHandler();
private String url;
private URI uri;
public HttpClient(String url) {
this.url = url;
}
public void connect() throws Exception {
uri = new URI(url);
EventLoopGroup loopGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpRequestEncoder()).addLast(new HttpResponseDecoder()).addLast(clientHandler);
}
});
Channel channel = b.connect(uri.getHost(), uri.getPort() < 0 ? 80 : uri.getPort()).sync().channel();
while (!channel.isActive()) {
Thread.sleep(1000);
}
}
public String getBody() throws Exception {
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());
request.headers().set(HttpHeaders.Names.HOST, uri.getHost());
request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes());
ChannelPromise promise = clientHandler.sendMessage(request);
promise.await();
return clientHandler.getData();
}
}
- 程序入口。
public class SyncHttpResponse {
public static void main(String[] args) throws Exception {
HttpClient client = new HttpClient("http://www.baidu.com");
client.connect();
System.out.println(client.getBody());
}
}
運(yùn)行程序就會(huì)打印出response的內(nèi)容了带射。
結(jié)語(yǔ)
netty是異步io框架同规,理論上不提倡同步的處理,但是某些情況下強(qiáng)依賴同步的結(jié)果窟社,可以采用這種方式券勺。