AMQPConnection
實(shí)例初始化
創(chuàng)建Connection時會通過FrameHandlerFacotry創(chuàng)建一個SocketFrameHandler遣蚀,SocketFrameHandler對Socket進(jìn)行了封裝。
public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
{
checkPreconditions();
this.username = params.getUsername();
this.password = params.getPassword();
this._frameHandler = frameHandler;
this._virtualHost = params.getVirtualHost();
this._exceptionHandler = params.getExceptionHandler();
this._clientProperties = new HashMap<String, Object>(params.getClientProperties());
this.requestedFrameMax = params.getRequestedFrameMax();
this.requestedChannelMax = params.getRequestedChannelMax();
this.requestedHeartbeat = params.getRequestedHeartbeat();
this.shutdownTimeout = params.getShutdownTimeout();
this.saslConfig = params.getSaslConfig();
this.executor = params.getExecutor();
this.threadFactory = params.getThreadFactory();
this._channelManager = null;
this._brokerInitiatedShutdown = false;
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
}
啟動連接
初始化WorkService和HeartBeatSender兢卵。
創(chuàng)建一個channel0的AMQChannel,這個channel不會被ChannelManager管理盐茎。
首先channel0會將一個BlockingRpcContinuation作為當(dāng)前未完成的Rpc請求袭蝗,用于接收handshake的響應(yīng)。
然后channel0會向socket中寫入一條只有header的消息作為handshake斋攀,header中包含了客戶端的版本號。
緊接著會啟動主循環(huán)線程梧田,主循環(huán)會通過SocketFrameHandler從socket接收字節(jié)流淳蔼。此時接收到的第一條數(shù)據(jù)是服務(wù)端響應(yīng)handshake返回的Connection.Start信息(包含服務(wù)端版本、機(jī)制裁眯、基礎(chǔ)信息)鹉梨。
主循環(huán)線程啟動后,主線程會阻塞地等待服務(wù)端的handshake響應(yīng)穿稳。拿到響應(yīng)之后會對服務(wù)器回傳的信息進(jìn)行比對存皂,然后發(fā)送Connection.StartOK的信息去服務(wù)端(這個請求也還是阻塞式的),等待服務(wù)端回傳Connection.Tune(包含最大channel數(shù)逢艘、最大frame長度和heartbeat間隔)旦袋。將這些信息與實(shí)例初始化是的設(shè)置信息進(jìn)行對比,初始化ChannelManager
緊接著發(fā)送Connection.TuneOk和Connection.Open消息去服務(wù)端它改,完成connection的建立疤孕。
Connection > MainLoop > readFrame
消息體
Frame是對AMQP消息的封裝:包含frame的type、channel號搔课、消息內(nèi)容
type|channelNumber|payloadSize|payload|frameEndMarker
Payload包含了消息類型胰柑、消息頭和消息主題
method|header|body
消息發(fā)送和接收
消息的發(fā)送和接收都要channel來完成。
創(chuàng)建Channel
通過Connection的ChannelManager來創(chuàng)建Channel爬泥,通過指定的ChannelNumber或者由分配器分配。創(chuàng)建好的Channel實(shí)例會放入ChannelManager的Map中崩瓤,key為ChannelNumber袍啡。由此可見Channel是Connection唯一的。
public ChannelN createChannel(AMQConnection connection);
public ChannelN createChannel(AMQConnection connection, int channelNumber);
private ChannelN addNewChannel(AMQConnection connection, int channelNumber);
protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService);
Channel實(shí)例化之后會調(diào)用Channel.open方法却桶,發(fā)送Channel.Open去服務(wù)端(阻塞式)境输,等待服務(wù)端響應(yīng)Channel.OpenOk蔗牡。
消息發(fā)送
Channel.transmit 發(fā)送消息,調(diào)用AMQCommand.transmit完成發(fā)送嗅剖。
AMQCommand.transmit將消息封裝成Frame辩越,通過connection的SocketFrameHandler寫入OutpuStream。
消息接收
主循環(huán)線程在鏈接創(chuàng)建完成后會監(jiān)聽socket信粮,從InputStream中讀取二進(jìn)制流封裝成Frame黔攒。通過Frame中的ChannelNumber從ChannelManager中獲取對應(yīng)的Channel實(shí)例處理Frame。
while (_running) {
Frame frame = _frameHandler.readFrame();
if (frame != null) {
_missedHeartbeats = 0;
if (frame.type == AMQP.FRAME_HEARTBEAT) {
// Ignore it: we've already just reset the heartbeat counter.
} else {
if (frame.channel == 0) { // the special channel
_channel0.handleFrame(frame);
} else {
if (isOpen()) {
// If we're still _running, but not isOpen(), then we
// must be quiescing, which means any inbound frames
// for non-zero channels (and any inbound commands on
// channel zero that aren't Connection.CloseOk) must
// be discarded.
ChannelManager cm = _channelManager;
if (cm != null) {
cm.getChannel(frame.channel).handleFrame(frame);
}
}
}
}
} else {
// Socket timeout waiting for a frame.
// Maybe missed heartbeat.
handleSocketTimeout();
}
}
Channel會使用已經(jīng)準(zhǔn)備好的AMQCommand處理Frame强缘,并未下一個Frame準(zhǔn)備新的AMQCommand。
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
_command = new AMQCommand(); // prepare for the next one
handleCompleteInboundCommand(command);
}
}
AMQCommad會使用CommandAssembler依次從Frame的payload中檢出對應(yīng)的Method、Header和Body凌净。如果檢出了Body什荣,整個Frame會被檢出完成。如過未完成商虐,會進(jìn)入主循環(huán)再次處理直至完成觉阅。
public synchronized boolean handleFrame(Frame f) throws IOException
{
switch (this.state) {
case EXPECTING_METHOD: consumeMethodFrame(f); break;
case EXPECTING_CONTENT_HEADER: consumeHeaderFrame(f); break;
case EXPECTING_CONTENT_BODY: consumeBodyFrame(f); break;
default:
throw new AssertionError("Bad Command State " + this.state);
}
return isComplete();
}
Frame被檢出完后,會根據(jù)Method的類型進(jìn)入不同的異步處理流程秘车。
Method在channel打開和關(guān)閉的情況下會以下的可能:
Channel打開:Basic.Deliver, Basic.Return, Basic.Flow, Basic.Ack, Basic.Nack, Basic.RecoveryOk, Basic.Cancel
Channel關(guān)閉:Channel.CloseOk
生產(chǎn)和消費(fèi)
生產(chǎn)
調(diào)用Channel.basicPublish()方法典勇,指定exchange、routingKey等信息鲫尊,消息屬性痴柔、消息體。封裝成Baisc.Publish疫向,放入AMQCommand咳蔚,最后調(diào)用transmit方法完成發(fā)送。參考消息發(fā)送
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
BasicProperties props, byte[] body)
throws IOException
{
if (nextPublishSeqNo > 0) {
unconfirmedSet.add(getNextPublishSeqNo());
nextPublishSeqNo++;
}
BasicProperties useProps = props;
if (props == null) {
useProps = MessageProperties.MINIMAL_BASIC;
}
transmit(new AMQCommand(new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.build(),
useProps, body));
}
消費(fèi)
創(chuàng)建QueueingConsumer實(shí)例搔驼,然后調(diào)用Channel.basicConsume方法谈火。
queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume("queue_name", queueingConsumer);
new Thread() {
@Override
public void run() {
while (true) {
try {
final QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
new Thread() {
@Override
public void run() {
try{
delivery.getEnvelope();//消息頭
delivery.getProperties();//消息屬性
delivery.getBody();//消息體
}finally{
//channel.basicAck();
//channel.basicNack()
}
}
}.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
QueueingConsumer實(shí)現(xiàn)了Consumer接口舌涨。
Channel.basicConsume方法會封裝Channel.Consume消息發(fā)送到服務(wù)端(阻塞式)糯耍,等待服務(wù)端的Channel.ConsumeOk響應(yīng)(包含了服務(wù)端為Consumer分配的ConsumerTag)。然后將QueueingConsumer放入Map中囊嘉,key為ConsumerTag温技。consumer是Channel唯一。
當(dāng)客戶端接收到消息扭粱,參考消息接收舵鳞。Basic.Deliver類型的消息(consumerTag、deliveryTag琢蛤、redelivered蜓堕、exchange抛虏、routingKey)會進(jìn)入消費(fèi)處理流程。Channel根據(jù)ConsumerTag從Map中獲取對應(yīng)的QueueConsumer實(shí)例套才,由Channel的ConsumerDispatcher通過Connection初始化的WorkService創(chuàng)建新的處理線程迂猴,調(diào)用QueueConsumer實(shí)例的handleDelivery方法。QueueConsumer將消息封裝成Delivery對象背伴,放入BlockingQueue中沸毁。
消費(fèi)線程等待新的Delivery(阻塞式),之后創(chuàng)建新的線程完成消息的處理挂据。