寫在前面
最近一年多一直在做服務(wù)治理相關(guān)的開發(fā)工作. 起初服務(wù)監(jiān)控采用了成本比較低的方式來實現(xiàn)(提供者,消費者自己按分鐘維度上報健康數(shù)據(jù)到Redis,但是這種方式只是在Java的服務(wù)提供者和消費者做到了很好的實現(xiàn), 其他語言目前只能上報很少部分的監(jiān)控數(shù)據(jù)). 因為公司的開發(fā)語言是多樣的, 其中包括: Nodejs, Ruby, Golang, Java, Scala等, 那么將來要對監(jiān)控數(shù)據(jù)的模型拓展, 需求變更等, 將很難快速推廣實現(xiàn). 隨著公司業(yè)務(wù)的高速發(fā)展, 以及將來所有服務(wù)部署Docker化, 服務(wù)的監(jiān)控預(yù)警已經(jīng)是服務(wù)治理工作中的重中之重. 服務(wù)監(jiān)控最好可以同時監(jiān)控基礎(chǔ)服務(wù)(Mysql, Redis等),業(yè)務(wù)服務(wù). 我們的業(yè)務(wù)服務(wù)是采用的Twitter的Finagle-thrift實現(xiàn)多語言之間的RPC調(diào)用. Balabala說了這么多, 就是我們現(xiàn)在要做全鏈路監(jiān)控, 做監(jiān)控首先第一步是需要可以收集到這些網(wǎng)絡(luò)調(diào)用的原始數(shù)據(jù), 這個時候ElasticStack中的Beats項目進(jìn)入了我們的視線, Beats項目中的Packetbeat子項目可以抓取到像Mysql, Redis, Thrift等協(xié)議的數(shù)據(jù)包. 但是,我們業(yè)務(wù)使用的通信協(xié)議是Finagle-thrift, 這里面為了滿足一些拓展(比如:用于RPC調(diào)用鏈跟蹤的Zipkin),Finagle-thrift在原生Thrift上做了二次封裝, 接下來需要讓Packetbeat對Finagle-thrift協(xié)議支持. 下面我將分析過程整理如下, 方便以后溫習(xí)回顧.
Packetbeat項目介紹
更詳細(xì)的請參考 Medcl的一個教程
整個Beats項目都是用的Golang語言開發(fā), Golang這幾天也是現(xiàn)學(xué)現(xiàn)賣, 我在整個調(diào)試過程中沒有找到可以比較方便進(jìn)行Debug的方式, 只能通過fmt.Println進(jìn)行各種調(diào)試信息的輸出, 這個過程比較痛苦. 這里我順便記錄一下怎么配置Go的環(huán)境, 有幾個概念比較懵,在此記錄一下.
安裝GO
在 這里 獲取對應(yīng)的操作系統(tǒng)的GO安裝bao
GOPATH
-
安裝好Go后需要設(shè)置環(huán)境變量,如下:
#這是Go的安裝路徑 export GOROOT=/usr/local/go export GOBIN=$GOROOT/bin #這里可以理解為Go項目的工作空間, 這里允許有多個目錄,注意用":"分割 #當(dāng)有多個GOPATH時,執(zhí)行 go get命令的內(nèi)容默認(rèn)會放在第一個目錄下 export GOPATH=/work/goworkspace
-
GOPATH的的幾個目錄約定
- src 放置Go項目的源碼
- pkg Go項目中使用的第三方包
- bin 編譯后生成的可執(zhí)行文件, 可以把此目錄加入到 PATH 變量中
獲取項目
#創(chuàng)建相應(yīng)目錄
mkdir -p $GOPATH/src/github.com/elastic/
cd $GOPATH/src/github.com/elastic
#簽出源碼
git clone https://github.com/elastic/beats.git
cd beats
#修改官方倉庫為upstream源,設(shè)置自己的倉庫為origin源
git remote rename origin upstream
git remote add origin git@github.com:medcl/packetbeat.git
#獲取上游最新的代碼渐扮,如果是剛fork的話可不用管
git pull upstream master
#簽出一個名為finagle的分支林束,用于開發(fā)這個功能
git checkout -b finagle
#切換到packetbeat模塊
cd packetbeat
#獲取依賴信息
(mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone https://github.com/golang/tools.git )
go get github.com/tools/godep
#編譯
make
yml配置文件說明
interfaces:
#如果提供者消費者在本機,直接寫成lo0
device: en0
protocols:
# 自定義協(xié)議名
finaglethrift:
ports: [20880, 9090, 9091, 9099, 9098]
# 自定義Thrift的Transport type一定要是frame的方式, 否則解析不出來
transport_type: framed
protocol_type: binary
# idl文件一定要有
idl_files: ["test_cfg/result.thrift","test_cfg/order.thrift","test_cfg/hello.thrift"]
output:
elasticsearch:
hosts: ["192.168.10.235:9200"]
kafka:
hosts: ["192.168.5.159:9092"]
topic: "packetbeat_test_qqq"
shipper:
logging:
files:
path: /tmp/mybeat
需要修改哪些文件
-
新增協(xié)議目錄, packetbeat啟動時會自動掃描protos目錄下的協(xié)議包
因為是要對Thrift協(xié)議進(jìn)行拓展, 所以之前很多代碼是可以復(fù)用的, 直接將原來的thrift目錄在當(dāng)前目錄下復(fù)制一份, 直接改名為finaglethrift
-
文件修改
為了便于區(qū)分, 我們將原來所有文件名中的thrift變更為finaglethrift, 變更之后我們只需要修改finaglethrift.go文件即可.-
將包名從thrift變更為finaglethrift
-
修改協(xié)議注冊名,這里的名稱直接匹配yml配置文件中的協(xié)議名
-
- 協(xié)議解析的具體方法修改, 主要業(yè)務(wù)抓包分析將在這個方法中完成,我們本次改動也是針對這個方法的修改
原生Thrift簡單分析
通訊協(xié)議格式
TCompactProtocol
-
TBinaryProtocol(我們主要采用這種格式進(jìn)行通訊)
TBinaryProtocol下通信方式采用TFramedTransport,即以幀的方式對數(shù)據(jù)進(jìn)行傳輸注意: 服務(wù)端, 服務(wù)端需要采用Framed的方式進(jìn)行通信, packetbeat采用Framed的方式進(jìn)行抓包分析, 如果thrift的傳輸方式不是這種方式, packetbeat將解析不出
TJSONProtocol
核心模型
-
TTransport, 這是一個基類,我們使用的傳輸方式是Framed, 那么直接使用的TFramedTransport將繼承TTransport. TFramedTransport會將數(shù)據(jù)寫入到一個buf中, 等全部寫完之后會調(diào)用flush方法,首先計算出buf中的數(shù)據(jù)長度,將4個字節(jié)的幀長度和數(shù)據(jù)內(nèi)容進(jìn)行封裝進(jìn)行發(fā)送. 針對解析方怎么判斷是否解析完,都是通過發(fā)送的data中頭四個字節(jié)判斷.具體如下圖:
具體封裝源代碼:
@Override public void flush() throws TTransportException { byte[] buf = writeBuffer_.get(); int len = writeBuffer_.len(); writeBuffer_.reset(); # 封裝成 4個字節(jié) + 幀內(nèi)容 encodeFrameSize(len, i32buf); transport_.write(i32buf, 0, 4); transport_.write(buf, 0, len); transport_.flush(); }
-
TProtocol, 協(xié)議接口, 我們主要是采用TBinaryProtocol的協(xié)議類進(jìn)行通信, 其中實現(xiàn)了接口中的操作協(xié)議的方法. TBinaryProtocol需要為消息體封裝一個Header, 其中還定義了Thrift中的讀寫模式(這里很重要,如果模式不匹配將無法正常解析),主要分為: 嚴(yán)謹(jǐn)?shù)淖x寫, 普通讀寫. 因為我們主要針對嚴(yán)謹(jǐn)讀寫模式進(jìn)行抓包分析, 下面將重點解析一下在嚴(yán)謹(jǐn)讀寫模式下的消息體內(nèi)容都是什么, 具體如下圖:
2016-04-30_11-39-09
在TBinaryProtocol中有對消息體的讀取和寫入操作, 具體代碼如下:
public TMessage readMessageBegin() throws TException {
int size = readI32();
if (size < 0) {
int version = size & VERSION_MASK;
if (version != VERSION_1) {
throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin");
}
return new TMessage(readString(), (byte)(size & 0x000000ff), readI32());
} else {
if (strictRead_) {
throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?");
}
return new TMessage(readStringBody(size), readByte(), readI32());
}
}
public void writeMessageBegin(TMessage message) throws TException {
if (strictWrite_) {
int version = VERSION_1 | message.type;
writeI32(version);
writeString(message.name);
writeI32(message.seqid);
} else {
writeString(message.name);
writeByte(message.type);
writeI32(message.seqid);
}
}
/**
* Message type constants in the Thrift protocol.
*
*/
public final class TMessageType {
public static final byte CALL = 1;
public static final byte REPLY = 2;
public static final byte EXCEPTION = 3;
public static final byte ONEWAY = 4;
}
- TMessage, 服務(wù)提供者,消費者在進(jìn)行RPC通信時都會講傳遞的數(shù)據(jù)封裝成TMessage, 主要包含三部分
- 名稱
- 序號
- 類型
Finagle-thrift協(xié)議分析
因為Finagle-thrift是在Thrift協(xié)議之上做了封裝, 我們主要對著兩個協(xié)議中具體的數(shù)據(jù)進(jìn)行比對.
測試數(shù)據(jù)IDL
為了讓測試具有代表性, 構(gòu)建的IDL文件中既有簡單的沒有入?yún)?返回值的finaglePing方法, 也有有入?yún)?復(fù)雜返回值的detail方法
include "result.thrift"
/*訂單*/
struct Order {
1:i32 userId
/*買家*/
2:string userName,
/*訂單ID*/
3:string orderId,
}
struct OrderResult {
1:result.Result result,
2:optional Order order
}
service OrderServ{
/*訂單詳情*/
OrderResult detail(1:i32 userId, 2:string userName, 3:string orderId)
void finaglePing()
}
/************************復(fù)雜返回值Result的定義***************************/
struct FailDesc {
1:string name,
2:string failCode,
3:string desc
}
struct Result {
1:i32 code,
2:optional list<FailDesc> failDescList
}
struct StringResult {
1:Result result,
2:optional string value,
3:optional string extend
}
一次RPC調(diào)用的差異
原生Thrift調(diào)用
我們針對finaglePing方法通過原生Thrift進(jìn)行一次RPC調(diào)用,并在Client端TcpDump出產(chǎn)生的數(shù)據(jù)包
從圖上可以看出,包含了3次握手, 1次Client與Server的業(yè)務(wù)請求交互, 4次揮手關(guān)閉連接.
下面我們看Client發(fā)送請求時的具體數(shù)據(jù)包內(nèi)容如下圖:
這里包含數(shù)據(jù)長度, Thrift是否是嚴(yán)謹(jǐn)讀寫,消息類型, 消息內(nèi)容等信息.
Fiangle-thrift調(diào)用及分析
我們針對finaglePing方法同樣通過Fiangle-thrift方式進(jìn)行一次RPC調(diào)用,并在Client端TcpDump出產(chǎn)生的數(shù)據(jù)包
從上圖看出, 一次RPC調(diào)用包含了, 3次握手, 1次fiangle確認(rèn)協(xié)議的請求交互, 1次Client與Server的業(yè)務(wù)請求交互, 4次揮手關(guān)閉連接.
關(guān)于Client發(fā)送的請求和原生Thrift還不太一樣, 在創(chuàng)建完連接之后, 需要發(fā)送
一次帶有__can__finagle__trace__v3__
信息的請求已確認(rèn)是否是Finagle-thrift協(xié)議, 確認(rèn)成功之后才會進(jìn)行真正的業(yè)務(wù)交互, 這次確認(rèn)是一次標(biāo)準(zhǔn)的Thrift通信,具體如下圖:
下面是在確認(rèn)Fiangle標(biāo)識之后進(jìn)行的真正的業(yè)務(wù)通信,具體如下圖:
我們上面這張圖中可以看出在標(biāo)準(zhǔn)的Thrift協(xié)議數(shù)據(jù)之前Finagle-thrift自己又加了很多自己的數(shù)據(jù),具體加了什么, 我們來看一下Fiangle的源碼, 具體如下:
```
/**
* ThriftClientFramedCodec implements a framed thrift transport that
* supports upgrading in order to provide TraceContexts across
* requests.
*/
object ThriftClientFramedCodec {
/**
* Create a [[com.twitter.finagle.thrift.ThriftClientFramedCodecFactory]].
* Passing a ClientId will propagate that information to the server iff the server is a finagle
* server.
*/
def apply(clientId: Option[ClientId] = None) =
new ThriftClientFramedCodecFactory(clientId)
def get() = apply()
}
class ThriftClientFramedCodecFactory(
clientId: Option[ClientId],
_useCallerSeqIds: Boolean,
_protocolFactory: TProtocolFactory)
extends CodecFactory[ThriftClientRequest, Array[Byte]]#Client {
def this(clientId: Option[ClientId]) = this(clientId, false, Protocols.binaryFactory())
def this(clientId: ClientId) = this(Some(clientId))
// Fix this after the API/ABI freeze (use case class builder)
def useCallerSeqIds(x: Boolean): ThriftClientFramedCodecFactory =
new ThriftClientFramedCodecFactory(clientId, x, _protocolFactory)
/**
* Use the given protocolFactory in stead of the default `TBinaryProtocol.Factory`
*/
def protocolFactory(pf: TProtocolFactory) =
new ThriftClientFramedCodecFactory(clientId, _useCallerSeqIds, pf)
/**
* Create a [[com.twitter.finagle.thrift.ThriftClientFramedCodec]]
* with a default TBinaryProtocol.
*/
def apply(config: ClientCodecConfig) =
new ThriftClientFramedCodec(_protocolFactory, config, clientId, _useCallerSeqIds)
}
class ThriftClientFramedCodec(
protocolFactory: TProtocolFactory,
config: ClientCodecConfig,
clientId: Option[ClientId] = None,
useCallerSeqIds: Boolean = false
) extends Codec[ThriftClientRequest, Array[Byte]] {
private[this] val preparer = ThriftClientPreparer(
protocolFactory, config.serviceName,
clientId, useCallerSeqIds)
def pipelineFactory: ChannelPipelineFactory =
ThriftClientFramedPipelineFactory
override def prepareConnFactory(
underlying: ServiceFactory[ThriftClientRequest, Array[Byte]]
) = preparer.prepare(underlying)
override val protocolLibraryName: String = "thrift"
}
```
Scala源碼看起來太費勁, 既然知道了原理, 為了可以解析出具體的Fiangle-thrift中的東西, 我只需要設(shè)置FrameSize和data的offset的位置, 獲取到原生的Thrift協(xié)議中的Framed數(shù)據(jù)即可, 然后復(fù)用Packetbeat自帶的針對Thrift協(xié)議包的抓取與組合邏輯.
通過比對兩個業(yè)務(wù)包我知道中間Fiangle-thrift自己添加的信息字節(jié)大小固定為129個字節(jié),這只是Client在發(fā)送請求時才會添加這些附加信息, Server端返回值則是在原生Thrift協(xié)議中添加了1個字節(jié), 其中我還需要排除創(chuàng)建連接之后發(fā)送的Finagle-thrift協(xié)議確認(rèn)請求.
我們完成了普通Finagle-thrift協(xié)議的解析,接下來還要針對附帶Zipkin信息的Finagle-thrift協(xié)議的解析, Zipkin是參考Google的Dapper完成的可以對RPC調(diào)用鏈進(jìn)行跟蹤的框架, 這已經(jīng)是業(yè)內(nèi)針對分布式系統(tǒng)之間RPC調(diào)用鏈跟蹤的通用解決方案. Zipkin無非就是在RPC調(diào)用時多傳輸了TraceId, SpanId, ParentSpanId, IsSample等信息, 通過下面的Zipkin源碼可以確定這些信息的大小也是固定字節(jié),并且大小為4個字節(jié). Zipkin關(guān)于這塊的源碼如下:
```
/**
* The wire format is (big-endian):
* ''spanId:8 parentId:8 traceId:8 flags:8''
*/
def tryUnmarshal(body: Buf): Try[TraceId] = {
if (body.length != 32)
return Throw(new IllegalArgumentException("Expected 32 bytes"))
val bytes = local.get()
body.write(bytes, 0)
val span64 = ByteArrays.get64be(bytes, 0)
val parent64 = ByteArrays.get64be(bytes, 8)
val trace64 = ByteArrays.get64be(bytes, 16)
val flags64 = ByteArrays.get64be(bytes, 24)
val flags = Flags(flags64)
val sampled = if (flags.isFlagSet(Flags.SamplingKnown)) {
if (flags.isFlagSet(Flags.Sampled)) someTrue else someFalse
} else None
val traceId = TraceId(
if (trace64 == parent64) None else Some(SpanId(trace64)),
if (parent64 == span64) None else Some(SpanId(parent64)),
SpanId(span64),
sampled,
flags)
Return(traceId)
}
```
下面是Fiangle-thrift針對Zipkin關(guān)閉和開啟的一個抓包對比圖:
根據(jù)上面的分析邏輯,我就可以在Packetbeat中的messageParser方法中通過一些字節(jié)特征修正FrameSize和data的offset來把數(shù)據(jù)包變成原生的Thrift協(xié)議數(shù)據(jù)包, 具體代碼如下:
func (thrift *Thrift) messageParser(s *ThriftStream) (bool, bool) {
var ok, complete bool
var m = s.message
for s.parseOffset < len(s.data) {
dataStr := string(s.data)
switch s.parseState {
case ThriftStartState:
m.start = s.parseOffset
if thrift.TransportType == ThriftTFramed {
if len(s.data) < 4 {
return true, false
}
frameSize := common.Bytes_Ntohl(s.data[:4])
m.FrameSize = frameSize
s.parseOffset = 4
if (!strings.Contains(dataStr, "__can__finagle__trace__v3__")) {
var thriftFlagIndex1 int = bytes.LastIndex(s.data, thriftFlag1)
if thriftFlagIndex1> -1 {// 如果標(biāo)識為80010001 那么代表是client->server
// client -> server
m.FrameSize = common.Bytes_Ntohl(s.data[:4]) - uint32(thriftFlagIndex1) - 4 // 從8001位置之后開始
s.parseOffset = thriftFlagIndex1// 從8001位置開始(包括8001位置)
}else{//如果沒有標(biāo)識為80010001, 那么應(yīng)該有標(biāo)識位80010002, 那么代表是server->client
// finagle 返回值
if bytes.LastIndex(s.data, thriftFlag2)==5 {
m.FrameSize = frameSize - 1
s.parseOffset = 4 + 1
}
}
}
}
... ...
}