??相信大家在日常工作中會遇到這種情況,一臺機器上兩個進程通過socket相互通信寇僧,server端重啟后端口沖突直接起不來了...此時rd同學心里一緊摊腋,是不是寫出bug了趕緊查日志發(fā)現(xiàn)是端口沖突了筆者目前在貓眼從事servicemesh相關(guān)研發(fā)工作,我們公司使用新美大的機器婉宰,無法擁有對機器的控制權(quán)歌豺,因此也就無法基于k8s來做servicemesh。簡單來說心包,sdk通過http的方式與mesh交互做服務注冊發(fā)現(xiàn)类咧,那sdk如何發(fā)現(xiàn)mesh 就成了一個問題。
??如上圖所示蟹腾,最開始sdk通過本地文件發(fā)現(xiàn)mesh的admin地址痕惋,mesh重啟后為防止端口沖突問題,會選一個未占用的端口進行監(jiān)聽娃殖,然后把最新的admin地址寫到本地文件值戳。sdk要能及時的刷新mesh最新的admin地址,所以sdk內(nèi)部要有一個定時任務定期刷新admin地址炉爆。由此來看sdk的邏輯就變得非常重了堕虹,sdk本身的定位就是序列化和反序列化數(shù)據(jù)發(fā)送給mesh。那么有什么好的方法能解決上面這么惡心的問題嗎芬首?當然有啦赴捞,通過uds就可以。
??Unix domain socket 又叫 IPC(inter-process communication 進程間通信) socket郁稍,用于實現(xiàn)同一主機上的進程間通信赦政。socket 原本是為網(wǎng)絡通訊設計的,但后來在 socket 的框架上發(fā)展出一種 IPC 機制耀怜,就是 UNIX domain socket恢着。雖然網(wǎng)絡 socket 也可用于同一臺主機的進程間通訊(通過 loopback 地址 127.0.0.1)桐愉,但是 UNIX domain socket 用于 IPC 更有效率:不需要經(jīng)過網(wǎng)絡協(xié)議棧,不需要打包拆包掰派、計算校驗和从诲、維護序號和應答等,只是將應用層數(shù)據(jù)從一個進程拷貝到另一個進程碗淌。這是因為盏求,IPC 機制本質(zhì)上是可靠的通訊,而網(wǎng)絡協(xié)議是為不可靠的通訊設計的亿眠。有了uds進行本機通信碎罚,再也不用擔心mesh重啟或啟動時端口沖突的問題了。使用uds之后sdk與mesh的交互方式纳像。這樣sdk直接拿到uds路徑new uds客戶端調(diào)用mesh就可以了荆烈,很方便
??sdk的語言有多種,我拿Java來舉例竟趾。mesh 使用go語言實現(xiàn)憔购,很容易實現(xiàn)一個應用層http協(xié)議傳輸層uds協(xié)議的server。
func main() {
RunServer()
}
?
var (
// 聲明 Unix 套接字的地址
serverAddr = &net.UnixAddr{Name: "/opt/test.sock", Net: "unix"}
)
?
func RunServer() {
// unlink 系統(tǒng)調(diào)用比較特殊岔帽。關(guān)于它的描述中有一點:如果這個文件是一個 unix socket玫鸟,它會被移除,但是打開它的進程可以繼續(xù)使用它犀勒。也就是說新舊進程都會在這個地址監(jiān)聽屎飘。
syscall.Unlink(serverAddr.Name)
lis, err := net.ListenUnix("unix", serverAddr)
if err != nil {
fmt.Println("ListenUnix", err)
return
}
http.HandleFunc("/get", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
query := req.URL.Query()
get := query.Get("key")
fmt.Printf("server get key = %s value = %s \n", "key", get)
builder := strings.Builder{}
for i := 0; i < 3; i++ {
builder.WriteString(strconv.Itoa(i))
}
s := req.Header.Get("sequenceid")
w.Header().Add("sequenceid", s)
w.Write([]byte(builder.String()))
}))
?
http.HandleFunc("/post", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
bytes, err := ioutil.ReadAll(req.Body)
req.Body.Close()
if err != nil {
return
}
h := req.Header.Get("sequenceid")
contentType := req.Header.Get("Content-Type")
fmt.Println(contentType)
w.Header().Add("sequenceid", h)
fmt.Printf("server post receive request %s \n", string(bytes))
w.Write([]byte("post success"))
}))
svr := &http.Server{Handler: http.DefaultServeMux}
err = svr.Serve(lis)
if err != nil {
fmt.Println("Serve err:", err)
}
}
?
??而Java語言就相對麻煩了,在做這一塊的時候在網(wǎng)上調(diào)研沒有找到什么資料贾费,最終使用netty封裝了一個HTTPUdsClient的包钦购。因為netty是異步的,所以要把異步轉(zhuǎn)同步褂萧,這里只是給出一個簡單的demo并沒有轉(zhuǎn)同步押桃。那么異步轉(zhuǎn)同步如何實現(xiàn)呢?我的做法是在http header中添加Sequenceid导犹,channel中發(fā)完數(shù)據(jù)后使用CountDownLatch wait 等待唱凯,當mesh 返回數(shù)據(jù)后解碼進入到Handler中,觸發(fā)CountDownLatch 的countDown操作谎痢,很容易就異步轉(zhuǎn)同步了波丰。還有一點說明:每次http 響應收到后都會把uds連接關(guān)閉掉。
public class NettyUdsHttpClient {
public static void main(String[] args) throws Exception {
final NettyUdsHttpClient nettyUdsHttpClient = new NettyUdsHttpClient();
nettyUdsHttpClient.request("/opt/test.sock");
}
private Bootstrap b = null;
private static EventLoopGroup workerGroup = null;
public NettyUdsHttpClient() {
EventLoopGroup workerGroup = null;
Class domainSocketChannelClazz = null;
if (Epoll.isAvailable()) {
domainSocketChannelClazz = EpollDomainSocketChannel.class;
System.out.println("Epoll.isAvailable");
workerGroup = new EpollEventLoopGroup(1);
} else if (KQueue.isAvailable()) {
System.out.println("KQueue.isAvailable");
workerGroup = new KQueueEventLoopGroup(1);
domainSocketChannelClazz = KQueueDomainSocketChannel.class;
} else {
System.out.println("use NioEventLoopGroup");
workerGroup = new NioEventLoopGroup(1);
domainSocketChannelClazz = NioSocketChannel.class;
}
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(domainSocketChannelClazz);
b.option(ChannelOption.SO_KEEPALIVE, false);
b.handler(new ChannelInitializer<DomainSocketChannel>() {
@Override
public void initChannel(DomainSocketChannel ch) throws Exception {
// 客戶端接收到的是httpResponse響應舶得,所以要使用HttpResponseDecoder進行解碼
ch.pipeline().addLast(new HttpResponseDecoder());
// 客戶端發(fā)送的是httprequest,所以要使用HttpRequestEncoder進行編碼
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpObject>() {
private Map<String, String> headerMap = new HashMap<>();
private int statusCode;
private StringBuilder contentStr = new StringBuilder();
private int currentSequenceId;
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpResponse) {
DefaultHttpResponse response = (DefaultHttpResponse) msg;
this.statusCode = response.status().code();
HttpHeaders headers = response.headers();
Integer sequenceId = headers.getInt("Sequenceid");
if (sequenceId != null) {
this.currentSequenceId = sequenceId;
}
Iterator<Map.Entry<String, String>> headerIterator = headers.iteratorAsString();
// 封裝header
while (headerIterator.hasNext()) {
Map.Entry<String, String> header = headerIterator.next();
headerMap.put(header.getKey(), header.getValue());
}
}
// 和mesh交互爽蝴,沒有Trailer沐批,因此不考慮
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
contentStr.append(content.content().toString(StandardCharsets.UTF_8));
if (msg instanceof LastHttpContent) {
// http 響應已經(jīng)讀完
System.out.println("currentSequenceId = " + currentSequenceId + "響應碼 = " + statusCode + " headerMap = " + headerMap + " content = " + contentStr.toString());
ctx.channel().close();
}
}
}
});
}
});
this.b = b;
this.workerGroup = workerGroup;
}
public void request(String path) throws Exception {
try {
// Start the client.
ChannelFuture f = b.connect(new DomainSocketAddress(path)).syncUninterruptibly();
// get 請求
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/get?key=123", Unpooled.EMPTY_BUFFER);
request.headers().set("Sequenceid", 1);
request.headers().set(HttpHeaderNames.HOST, "daemon");
// 發(fā)送http請求
f.channel().writeAndFlush(request);
// post 請求
ChannelFuture f1 = b.connect(new DomainSocketAddress(path)).syncUninterruptibly();
String msg = "hello";
ByteBuf byteBuf = Unpooled.wrappedBuffer(msg.getBytes("UTF-8"));
DefaultFullHttpRequest request1 = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/post", byteBuf);
request1.headers().set(HttpHeaderNames.HOST, "daemon");
request1.headers().set("Sequenceid", 2);
request1.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
request1.headers().set(HttpHeaderNames.CONTENT_LENGTH, request1.content().readableBytes());
f1.channel().writeAndFlush(request1);
System.out.println("over ");
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
}
??其實在工作中很容易遇到一些比較痛苦的事情纫骑,這時候如何考慮優(yōu)化掉這個事情就很重要了,等解決一個非常棘手的事情就會有成就感九孩。感謝您的閱讀先馆,如果感覺我寫的還行,求關(guān)注~