進程重啟端口號沖突了明棍,太坑了吧

??相信大家在日常工作中會遇到這種情況,一臺機器上兩個進程通過socket相互通信寇僧,server端重啟后端口沖突直接起不來了...此時rd同學心里一緊摊腋,是不是寫出bug了趕緊查日志發(fā)現(xiàn)是端口沖突了筆者目前在貓眼從事servicemesh相關(guān)研發(fā)工作,我們公司使用新美大的機器婉宰,無法擁有對機器的控制權(quán)歌豺,因此也就無法基于k8s來做servicemesh。簡單來說心包,sdk通過http的方式與mesh交互做服務注冊發(fā)現(xiàn)类咧,那sdk如何發(fā)現(xiàn)mesh 就成了一個問題。

image

??如上圖所示蟹腾,最開始sdk通過本地文件發(fā)現(xiàn)mesh的admin地址痕惋,mesh重啟后為防止端口沖突問題,會選一個未占用的端口進行監(jiān)聽娃殖,然后把最新的admin地址寫到本地文件值戳。sdk要能及時的刷新mesh最新的admin地址,所以sdk內(nèi)部要有一個定時任務定期刷新admin地址炉爆。由此來看sdk的邏輯就變得非常重了堕虹,sdk本身的定位就是序列化和反序列化數(shù)據(jù)發(fā)送給mesh。那么有什么好的方法能解決上面這么惡心的問題嗎芬首?當然有啦赴捞,通過uds就可以。
??Unix domain socket 又叫 IPC(inter-process communication 進程間通信) socket郁稍,用于實現(xiàn)同一主機上的進程間通信赦政。socket 原本是為網(wǎng)絡通訊設計的,但后來在 socket 的框架上發(fā)展出一種 IPC 機制耀怜,就是 UNIX domain socket恢着。雖然網(wǎng)絡 socket 也可用于同一臺主機的進程間通訊(通過 loopback 地址 127.0.0.1)桐愉,但是 UNIX domain socket 用于 IPC 更有效率:不需要經(jīng)過網(wǎng)絡協(xié)議棧,不需要打包拆包掰派、計算校驗和从诲、維護序號和應答等,只是將應用層數(shù)據(jù)從一個進程拷貝到另一個進程碗淌。這是因為盏求,IPC 機制本質(zhì)上是可靠的通訊,而網(wǎng)絡協(xié)議是為不可靠的通訊設計的亿眠。有了uds進行本機通信碎罚,再也不用擔心mesh重啟或啟動時端口沖突的問題了。使用uds之后sdk與mesh的交互方式纳像。這樣sdk直接拿到uds路徑new uds客戶端調(diào)用mesh就可以了荆烈,很方便

image

??sdk的語言有多種,我拿Java來舉例竟趾。mesh 使用go語言實現(xiàn)憔购,很容易實現(xiàn)一個應用層http協(xié)議傳輸層uds協(xié)議的server。

func main() {
  RunServer()
}
?
var (
  // 聲明 Unix 套接字的地址
  serverAddr = &net.UnixAddr{Name: "/opt/test.sock", Net: "unix"}
)
?
func RunServer() {
  // unlink 系統(tǒng)調(diào)用比較特殊岔帽。關(guān)于它的描述中有一點:如果這個文件是一個 unix socket玫鸟,它會被移除,但是打開它的進程可以繼續(xù)使用它犀勒。也就是說新舊進程都會在這個地址監(jiān)聽屎飘。
  syscall.Unlink(serverAddr.Name)
  lis, err := net.ListenUnix("unix", serverAddr)
  if err != nil {
    fmt.Println("ListenUnix", err)
    return
  }
  http.HandleFunc("/get", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    query := req.URL.Query()
    get := query.Get("key")
    fmt.Printf("server get key = %s value = %s \n", "key", get)
    builder := strings.Builder{}
    for i := 0; i < 3; i++ {
      builder.WriteString(strconv.Itoa(i))
    }
    s := req.Header.Get("sequenceid")
    w.Header().Add("sequenceid", s)
    w.Write([]byte(builder.String()))
  }))
?
  http.HandleFunc("/post", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    bytes, err := ioutil.ReadAll(req.Body)
    req.Body.Close()
    if err != nil {
      return
    }
    h := req.Header.Get("sequenceid")
    contentType := req.Header.Get("Content-Type")
    fmt.Println(contentType)
    w.Header().Add("sequenceid", h)
    fmt.Printf("server post receive request %s \n", string(bytes))
    w.Write([]byte("post success"))
  }))
  svr := &http.Server{Handler: http.DefaultServeMux}
  err = svr.Serve(lis)
  if err != nil {
    fmt.Println("Serve err:", err)
  }
}
?

??而Java語言就相對麻煩了,在做這一塊的時候在網(wǎng)上調(diào)研沒有找到什么資料贾费,最終使用netty封裝了一個HTTPUdsClient的包钦购。因為netty是異步的,所以要把異步轉(zhuǎn)同步褂萧,這里只是給出一個簡單的demo并沒有轉(zhuǎn)同步押桃。那么異步轉(zhuǎn)同步如何實現(xiàn)呢?我的做法是在http header中添加Sequenceid导犹,channel中發(fā)完數(shù)據(jù)后使用CountDownLatch wait 等待唱凯,當mesh 返回數(shù)據(jù)后解碼進入到Handler中,觸發(fā)CountDownLatch 的countDown操作谎痢,很容易就異步轉(zhuǎn)同步了波丰。還有一點說明:每次http 響應收到后都會把uds連接關(guān)閉掉。

public class NettyUdsHttpClient {

    public static void main(String[] args) throws Exception {
        final NettyUdsHttpClient nettyUdsHttpClient = new NettyUdsHttpClient();
        nettyUdsHttpClient.request("/opt/test.sock");
    }

    private Bootstrap b = null;

    private static EventLoopGroup workerGroup = null;

    public NettyUdsHttpClient() {
        EventLoopGroup workerGroup = null;
        Class domainSocketChannelClazz = null;
        if (Epoll.isAvailable()) {
            domainSocketChannelClazz = EpollDomainSocketChannel.class;
            System.out.println("Epoll.isAvailable");
            workerGroup = new EpollEventLoopGroup(1);
        } else if (KQueue.isAvailable()) {
            System.out.println("KQueue.isAvailable");
            workerGroup = new KQueueEventLoopGroup(1);
            domainSocketChannelClazz = KQueueDomainSocketChannel.class;
        } else {
            System.out.println("use NioEventLoopGroup");
            workerGroup = new NioEventLoopGroup(1);
            domainSocketChannelClazz = NioSocketChannel.class;
        }

        Bootstrap b = new Bootstrap();
        b.group(workerGroup);
        b.channel(domainSocketChannelClazz);
        b.option(ChannelOption.SO_KEEPALIVE, false);
        b.handler(new ChannelInitializer<DomainSocketChannel>() {
            @Override
            public void initChannel(DomainSocketChannel ch) throws Exception {
                // 客戶端接收到的是httpResponse響應舶得,所以要使用HttpResponseDecoder進行解碼
                ch.pipeline().addLast(new HttpResponseDecoder());

                // 客戶端發(fā)送的是httprequest,所以要使用HttpRequestEncoder進行編碼
                ch.pipeline().addLast(new HttpRequestEncoder());
                ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpObject>() {

                    private Map<String, String> headerMap = new HashMap<>();

                    private int statusCode;

                    private StringBuilder contentStr = new StringBuilder();

                    private int currentSequenceId;

                    @Override
                    public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
                        if (msg instanceof HttpResponse) {
                            DefaultHttpResponse response = (DefaultHttpResponse) msg;
                            this.statusCode = response.status().code();
                            HttpHeaders headers = response.headers();
                            Integer sequenceId = headers.getInt("Sequenceid");
                            if (sequenceId != null) {
                                this.currentSequenceId = sequenceId;
                            }
                            Iterator<Map.Entry<String, String>> headerIterator = headers.iteratorAsString();
                            // 封裝header
                            while (headerIterator.hasNext()) {
                                Map.Entry<String, String> header = headerIterator.next();
                                headerMap.put(header.getKey(), header.getValue());
                            }
                        }
                        // 和mesh交互爽蝴,沒有Trailer沐批,因此不考慮
                        if (msg instanceof HttpContent) {
                            HttpContent content = (HttpContent) msg;
                            contentStr.append(content.content().toString(StandardCharsets.UTF_8));
                            if (msg instanceof LastHttpContent) {
                                // http 響應已經(jīng)讀完
                                System.out.println("currentSequenceId = " + currentSequenceId + "響應碼 = " + statusCode + " headerMap = " + headerMap + " content = " + contentStr.toString());
                                ctx.channel().close();
                            }
                        }
                    }
                });
            }
        });
        this.b = b;
        this.workerGroup = workerGroup;
    }

    public void request(String path) throws Exception {
        try {
            // Start the client.
            ChannelFuture f = b.connect(new DomainSocketAddress(path)).syncUninterruptibly();

            // get 請求
            DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
                    "/get?key=123", Unpooled.EMPTY_BUFFER);

            request.headers().set("Sequenceid", 1);
            request.headers().set(HttpHeaderNames.HOST, "daemon");
            // 發(fā)送http請求
            f.channel().writeAndFlush(request);

            // post 請求
            ChannelFuture f1 = b.connect(new DomainSocketAddress(path)).syncUninterruptibly();

            String msg = "hello";
            ByteBuf byteBuf = Unpooled.wrappedBuffer(msg.getBytes("UTF-8"));

            DefaultFullHttpRequest request1 = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/post", byteBuf);
            request1.headers().set(HttpHeaderNames.HOST, "daemon");
            request1.headers().set("Sequenceid", 2);
            request1.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
            request1.headers().set(HttpHeaderNames.CONTENT_LENGTH, request1.content().readableBytes());
            f1.channel().writeAndFlush(request1);
            System.out.println("over ");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

??其實在工作中很容易遇到一些比較痛苦的事情纫骑,這時候如何考慮優(yōu)化掉這個事情就很重要了,等解決一個非常棘手的事情就會有成就感九孩。感謝您的閱讀先馆,如果感覺我寫的還行,求關(guān)注~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末躺彬,一起剝皮案震驚了整個濱河市煤墙,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌宪拥,老刑警劉巖仿野,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異她君,居然都是意外死亡脚作,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門缔刹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來球涛,“玉大人,你說我怎么就攤上這事校镐∫诒猓” “怎么了?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵鸟廓,是天一觀的道長从祝。 經(jīng)常有香客問我,道長肝箱,這世上最難降的妖魔是什么哄褒? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮煌张,結(jié)果婚禮上呐赡,老公的妹妹穿的比我還像新娘。我一直安慰自己骏融,他們只是感情好链嘀,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著档玻,像睡著了一般怀泊。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上误趴,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天霹琼,我揣著相機與錄音,去河邊找鬼。 笑死枣申,一個胖子當著我的面吹牛售葡,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播忠藤,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼挟伙,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了模孩?” 一聲冷哼從身側(cè)響起尖阔,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎榨咐,沒想到半個月后介却,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡祭芦,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年筷笨,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片龟劲。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡胃夏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出昌跌,到底是詐尸還是另有隱情仰禀,我是刑警寧澤,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布蚕愤,位于F島的核電站答恶,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏萍诱。R本人自食惡果不足惜悬嗓,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望裕坊。 院中可真熱鬧包竹,春花似錦、人聲如沸籍凝。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽饵蒂。三九已至声诸,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間退盯,已是汗流浹背彼乌。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工泻肯, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人囤攀。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓软免,卻偏偏與公主長得像,于是被迫代替她去往敵國和親焚挠。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354