PulsarClient
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
讓我們看一下這個(gè)類的主要方法
創(chuàng)建producer/consumer/reader
元數(shù)據(jù)信息相關(guān)
transaction相關(guān)
close方法
ClientBuilder
這里有一個(gè)builder方法用來傳遞一些PulsarClient的配置
支持的配置項(xiàng)
-
連接配置相關(guān):
連接地址:serviceUrl / serviceUrlProvider / listener / proxyServiceUrl
operation超時(shí)時(shí)間: operationTimeout
-
tcp配置:
tcpNoDelay
keepAliveinterval
建立連接超時(shí):connectionTimeout
一個(gè)broker創(chuàng)建多少連接
請(qǐng)求重試策略(請(qǐng)求出錯(cuò)后backOff時(shí)間是多少)
-
lookup請(qǐng)求配置:
lookup請(qǐng)求并發(fā)
最大重定向次數(shù)
連接最大拒絕的請(qǐng)求數(shù)目
-
線程數(shù)目:
ioThreads
listenerThreads
TLS + 鑒權(quán)相關(guān)
事務(wù)相關(guān)
metric相關(guān)
這里面Builder.build就直接配置參數(shù)傳入了PulsarClientImpl的構(gòu)造函數(shù)了
我們看下這里面做了什么操作
PulsarClientImpl
package org.apache.pulsar.client.impl;
public class PulsarClientImpl implements PulsarClient {
// 查找服務(wù)
private LookupService lookup;
// 連接池
private final ConnectionPool cnxPool;
// netty 里面的HashedWheelTimer,用來調(diào)度一些延遲操作
private final Timer timer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorService;
// 當(dāng)前PulsarClient的狀態(tài)
private AtomicReference<State> state = new AtomicReference<>();
// 所有的業(yè)務(wù)處理單元(客戶端邏輯)
private final Set<ProducerBase<?>> producers;
private final Set<ConsumerBase<?>> consumers;
// id發(fā)號(hào)器
private final AtomicLong producerIdGenerator = new AtomicLong();
private final AtomicLong consumerIdGenerator = new AtomicLong();
private final AtomicLong requestIdGenerator = new AtomicLong();
// 這里面的EventLoopGroup好像只被當(dāng)成線程池來用了
// 0. ConnectionPool 里面初始化作為連接的io線程池(netty客戶端常規(guī)用法)
// 1. 在Consumer里面用來定時(shí)flush PersistentAcknowledgmentsGroupingTracker
// 2. Producer 里面用來定時(shí)生成加密的key
// 3. 作為AsyncHttpClient的構(gòu)造參數(shù)
private final EventLoopGroup eventLoopGroup;
// Schema 的cache
private final LoadingCache<String, SchemaInfoProvider> schemaProviderLoadingCache;
// producer 用來生成PublishTime
private final Clock clientClock;
@Getter
private TransactionCoordinatorClientImpl tcClient;
這個(gè)類的構(gòu)造函數(shù)主要就是初始化這幾個(gè)關(guān)鍵變量,沒有特殊操作
LookUpService
根據(jù)配置參數(shù)會(huì)選擇HttpLookupService
或者是BinaryProtoLookupService
ConnectionPool
我們先看一下ConnectionPool
package org.apache.pulsar.client.impl;
public class ConnectionPool implements Closeable {
// 連接池,保存連接
// 地址 -> 第x個(gè)連接 -> 連接
// 如果配置maxConnectionsPerHosts=0 則把pooling關(guān)閉了
protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
// netty 相關(guān)
// PulsarClient 傳遞過來的
private final EventLoopGroup eventLoopGroup;
private final Bootstrap bootstrap;
private final PulsarChannelInitializer channelInitializerHandler;
protected final DnsNameResolver dnsResolver;
// 配置
private final ClientConfigurationData clientConfig;
private final int maxConnectionsPerHosts;
// 是否是Server Name Indication 代理慎菲,TLS 相關(guān)豆拨,先忽略
private final boolean isSniProxy;
構(gòu)造函數(shù)主要是按照netty 網(wǎng)絡(luò)客戶端方式初始化相關(guān)成員變量
bootstrap = new Bootstrap();
// 綁定io線程池
bootstrap.group(eventLoopGroup);
// 配置了channel類型某饰,如果支持Epoll的話會(huì)變成Epoll的channel
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
// 設(shè)置tcp的連接超時(shí)時(shí)間
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
// 設(shè)置tcp no delay
bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
// 配置allocator
bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
// 綁定channelInitializer
channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
bootstrap.handler(channelInitializerHandler);
// 這個(gè)類是netty提供的些膨,用來解析DNS基显,后面專門會(huì)說
this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
.channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
}
這里面?zhèn)魅氲腂ufferPool是一個(gè)自定義的
這個(gè)連接池的主要功能
創(chuàng)建并cache連接
歸還連接
按照配置的
maxConnectionsPerHosts
限制連接數(shù)目
具體使用方式可以參照org.apache.pulsar.client.impl.ConnectionPoolTest
這個(gè)類
ConnectionPool pool;
InetSocketAddress brokerAddress = ....;
// 獲取連接脱衙,如果之前沒有的話侥猬,會(huì)創(chuàng)建一個(gè)
CompletableFuture<ClientCnx> conn = pool.getConnection(brokerAddress);
ClientCnx cnx = conn.get();
// 使用連接做事情
...
// 歸還給連接池
pool.releaseConnection(cnx);
pool.closeAllConnections();
pool.close();
我們先看一下這個(gè)類PulsarChannelInitializer
用來初始化和pulsar broker 端的連接。
public void initChannel(SocketChannel ch) throws Exception {
// tls相關(guān)
ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER);
// 定長(zhǎng)解碼器
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(
Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
// 到這里可以拿到了RPC協(xié)議反序列化后的對(duì)象捐韩,進(jìn)行客戶端邏輯處理
// 實(shí)際在這個(gè)類ClientCnx里面處理所有邏輯
ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
創(chuàng)建連接邏輯 (connectToAddress)
netty 的bootstrap.connect
(忽略tls)
ClientCnx
我們看一下這個(gè)類的層次結(jié)構(gòu)
public class ClientCnx extends PulsarHandler;
public abstract class PulsarHandler extends PulsarDecoder;
public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter;
PulsarDecoder
PulsarDecoder
這個(gè)類前面在初始化連接的時(shí)候還加入了一個(gè)LengthFieldBasedFrameDecoder
.
所以到這里的channelRead
就可以直接反序列化RPC就可以退唠,之后會(huì)調(diào)用相應(yīng)的RPC處理方法(handleXXXXXX)
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
...
// Get a buffer that contains the full frame
ByteBuf buffer = (ByteBuf) msg;
BaseCommand cmd = null;
BaseCommand.Builder cmdBuilder = null;
try {
// De-serialize the command
int cmdSize = (int) buffer.readUnsignedInt();
int writerIndex = buffer.writerIndex();
buffer.writerIndex(buffer.readerIndex() + cmdSize);
// 從對(duì)象池里拿到一個(gè)ByteBufCodedInputStream
ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get(buffer);
cmdBuilder = BaseCommand.newBuilder();
// 反序列化
cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build();
buffer.writerIndex(writerIndex);
cmdInputStream.recycle();
...
// 下面按照不同的RPC類型調(diào)用不用的方法進(jìn)行處理
switch (cmd.getType()) {
case PARTITIONED_METADATA:
checkArgument(cmd.hasPartitionMetadata());
try {
interceptCommand(cmd);
handlePartitionMetadataRequest(cmd.getPartitionMetadata());
} catch (InterceptException e) {
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(getServerError(e.getErrorCode()),
e.getMessage(), cmd.getPartitionMetadata().getRequestId()));
} finally {
cmd.getPartitionMetadata().recycle();
}
break;
...
// 省略其他RPC方法,都是正常handleXXXXX
} finally {
// 清理方法
if (cmdBuilder != null) {
cmdBuilder.recycle();
}
if (cmd != null) {
cmd.recycle();
}
buffer.release();
}
}
PulsarHandler
這個(gè)類實(shí)際里面主要增加了KeepAlive邏輯的實(shí)現(xiàn)。
具體查看相應(yīng)方法即可,比較容易
ClientCnx
這里主要負(fù)責(zé)和服務(wù)端交互的邏輯腻菇。
package org.apache.pulsar.client.impl;
public class ClientCnx extends PulsarHandler {
// 連接狀態(tài)
enum State {
None, SentConnectFrame, Ready, Failed, Connecting
}
private State state;
//----------------------------------------------------------------------
// 臨時(shí)的請(qǐng)求隊(duì)列
// requestId -> 請(qǐng)求
private final ConcurrentLongHashMap<CompletableFuture<? extends Object>> pendingRequests =
new ConcurrentLongHashMap<>(16, 1);
// Lookup 請(qǐng)求隊(duì)列
private final Queue<Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>>> waitingLookupRequests;
//----------------------------------------------------------------------
// 一些業(yè)務(wù)邏輯單元
private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1);
//----------------------------------------------------------------------
// 異步新建連接的handle
private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
//----------------------------------------------------------------------
// PulsarClient 構(gòu)造時(shí)傳遞進(jìn)來的線程池
private final EventLoopGroup eventLoopGroup;
//----------------------------------------------------------------------
// 限流(和lookup有關(guān))
private final Semaphore pendingLookupRequestSemaphore;
private final Semaphore maxLookupRequestSemaphore;
// 連接拒絕相關(guān)的成員(和lookup有關(guān))
private final int maxNumberOfRejectedRequestPerConnection;
private final int rejectedRequestResetTimeSec = 60;
// 被拒絕的請(qǐng)求數(shù)目(和lookup有關(guān))
private static final AtomicIntegerFieldUpdater<ClientCnx> NUMBER_OF_REJECTED_REQUESTS_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(ClientCnx.class, "numberOfRejectRequests");
@SuppressWarnings("unused")
private volatile int numberOfRejectRequests = 0;
//----------------------------------------------------------------------
// 用來檢查請(qǐng)求是否超時(shí)的數(shù)據(jù)結(jié)構(gòu)
private static class RequestTime {
final long creationTimeMs;
final long requestId;
final RequestType requestType;
RequestTime(long creationTime, long requestId, RequestType requestType) {
this.creationTimeMs = creationTime;
this.requestId = requestId;
this.requestType = requestType;
}
}
// 超時(shí)的請(qǐng)求隊(duì)列
private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
//----------------------------------------------------------------------
// 消息的最大大小
@Getter
private static int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
// RPC協(xié)議版本
private final int protocolVersion;
// operation超時(shí)時(shí)間
private final long operationTimeoutMs;
// 用來檢查operation超時(shí)時(shí)間的handle
private ScheduledFuture<?> timeoutTask;
//----------------------------------------------------------------------
// 一些記錄是否從proxy連接的信息
protected String proxyToTargetBrokerAddress = null;
protected String remoteHostName = null;
// TLS 相關(guān)
private boolean isTlsHostnameVerificationEnable;
private static final TlsHostnameVerifier HOSTNAME_VERIFIER = new TlsHostnameVerifier();
protected final Authentication authentication;
protected AuthenticationDataProvider authenticationDataProvider;
//----------------------------------------------------------------------
// 事務(wù)相關(guān)
private TransactionBufferHandler transactionBufferHandler;
private enum RequestType {
Command,
GetLastMessageId,
GetTopics,
GetSchema,
GetOrCreateSchema;
String getDescription() {
if (this == Command) {
return "request";
} else {
return name() + " request";
}
}
}
這里臨時(shí)回到ConnectionPool
的邏輯中苛骨,之前創(chuàng)建連接的時(shí)候?qū)嶋H調(diào)用Bootstrap.connect
這里返回的實(shí)際是一個(gè)Netty的Channel對(duì)象挖胃,但是ConnectionPool里面返回的ClientCnx
對(duì)象臊恋。
ConnectionPool
private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress,
InetSocketAddress physicalAddress, int connectionKey) {
final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<ClientCnx>();
// Trigger async connect to broker
createConnection(physicalAddress).thenAccept(channel -> {
....
// 這里面ClientCnx對(duì)象實(shí)際是從這個(gè)已經(jīng)成功連接的Channel的pipeline里拿到的
final ClientCnx cnx = (ClientCnx) channel.pipeline().get("handler");
....
if (!logicalAddress.equals(physicalAddress)) {
// We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that
// it can be specified when sending the CommandConnect.
// That phase will happen in the ClientCnx.connectionActive() which will be invoked immediately after
// this method.
cnx.setTargetBroker(logicalAddress);
}
// 保存了遠(yuǎn)端連接的地址
cnx.setRemoteHostName(physicalAddress.getHostName());
cnx.connectionFuture().thenRun(() -> {
...
// 連接成功則返回
cnxFuture.complete(cnx);
}).exceptionally(exception -> {
cnxFuture.completeExceptionally(exception);
cleanupConnection(logicalAddress, connectionKey, cnxFuture);
cnx.ctx().close();
return null;
});
...
ClientCnx的主要方法(功能)
-
連接生命周期管理(netty Handler里面的方法)
channelActive
channelInActive
exceptionCaught
......
-
發(fā)送request:主動(dòng)發(fā)送RPC的方法冈钦,并按照業(yè)務(wù)邏輯處理
Lookup請(qǐng)求
getLastMessageId
getSchema
.....
處理response:繼承自
PulsarDecoder
的handleXXXXX RPC 處理邏輯主動(dòng)發(fā)送RPC方法獲得原始的response
CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage,long requestId,RequestType requestType)
檢查請(qǐng)求是否超時(shí)
checkRequestTimeout
-
注冊(cè)/ 刪除業(yè)務(wù)邏輯對(duì)象(業(yè)務(wù)邏輯對(duì)象后面單出文章說)
consumer
producer
transactionMetaStoreHandler
transactionBufferHandler
sendRequestAndHandleTimeout方法
private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType) {
// 放入到pending請(qǐng)求隊(duì)列里面居凶,用來等待response
CompletableFuture<T> future = new CompletableFuture<>();
pendingRequests.put(requestId, future);
// 直接發(fā)送RPC body
ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
log.warn("{} Failed to send {} to broker: {}", ctx.channel(), requestType.getDescription(), writeFuture.cause().getMessage());
pendingRequests.remove(requestId);
future.completeExceptionally(writeFuture.cause());
}
});
// 在超時(shí)隊(duì)列里面增加一個(gè)數(shù)據(jù)結(jié)構(gòu)用來記錄超時(shí)
requestTimeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId, requestType));
return future;
}
chanelActive 方法
這個(gè)方法邏輯比較簡(jiǎn)單
PulsarHandler.channelActive方法里面開啟了KeepAlive邏輯的調(diào)度任務(wù)
ClientCnx.channelActive 方法里面開啟了requestTimeout邏輯的調(diào)度任務(wù)
發(fā)送一個(gè)ConnectCommand請(qǐng)求給服務(wù)端(服務(wù)端處理邏輯到后面會(huì)說)
請(qǐng)求超時(shí)的處理
這個(gè)邏輯也比較容易滩愁。
使用了EventLoopGroup調(diào)度了一個(gè)定時(shí)任務(wù)躯喇,每次去查看requestTimeoutQueue里面的請(qǐng)求是否有超時(shí)的
有的話就把這個(gè)請(qǐng)求的response設(shè)置成TimeoutException
這里的請(qǐng)求超時(shí)檢查時(shí)間間隔是operationTimeoutMs
決定的
PulsarClient 功能回顧
這樣讓我們回顧一下PulsarClient的總體功能
包含了一個(gè)連接池用來創(chuàng)建ClientCnx和服務(wù)端進(jìn)行溝通
保存了一些自定義業(yè)務(wù)處理單元(consumer,producer, tcClient)
LookupService
一些周期check的動(dòng)作
Schema 的LoadingCache
業(yè)務(wù)單元通過注冊(cè)到ClientCnx上面硝枉,可以使用這個(gè)連接發(fā)送RPC廉丽,獲得response,這樣傳遞回業(yè)務(wù)邏輯單元里面
PulsarClient這個(gè)類對(duì)使用者來說提供了一個(gè)RPC層面的抽象妻味,其他類使用RPC完成自己的邏輯