實現(xiàn)分布式事務的核心要點:
- 事務的持久化赋焕,事務所處的各種狀態(tài)事務參與方的各種狀態(tài)都需要持久化巧号,當實例宕機時才能基于持久化的數(shù)據(jù)對事務回滾或提交浓瞪,實現(xiàn)最終一致性
- 定時對超時未完成事務的處理(繼續(xù)嘗試提交或回滾),即通過重試機制實現(xiàn)事務的最終一致性
- 分布式事務的跨服務實例傳播女轿,當分布式事務跨多個實例時需要實現(xiàn)事務的傳播,一般需要適配不同的rpc框架
- 事務的隔離級別:大多數(shù)分布式事務為了性能壕翩,默認的隔離級別是讀未提交
- 冪等性:對于XA或者seata的AT這樣的分布式事務來說蛉迹,都已經(jīng)默認實現(xiàn)了冪等性,而TCC放妈、Saga這種接口級別實現(xiàn)的分布式事務都還需要業(yè)務開發(fā)者自己實現(xiàn)冪等性北救。
本片文章主要從seata-server的啟動流程的角度介紹一下seata-server的源碼荐操,啟動流程圖如下:
1. 啟動類Server
seata-server的入口類在Server類中,源碼如下:
public static void main(String[] args) throws IOException {
//解析啟動以及配置文件的各種配置參數(shù)
ParameterParser parameterParser = new ParameterParser(args);
//metrics相關珍策,暫時不關心
MetricsManager.get().init();
// 把從配置文件中讀取到的storeMode重新寫入SystemProperty中
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
//創(chuàng)建RpcServer實例托启,此時并沒有初始化,RpcServer負責與客戶端SDK中的TM攘宙、RM進行網(wǎng)絡通信
RpcServer rpcServer = new RpcServer(WORKING_THREADS);
//server port
rpcServer.setListenPort(parameterParser.getPort());
//UUIDGenerator初始化屯耸,UUIDGenerator用于生成全局事務、分支事務的id蹭劈,
//多個Server實例配置不同的ServerNode疗绣,保證id的唯一性
UUIDGenerator.init(parameterParser.getServerNode());
// SessionHodler負責事務日志(狀態(tài))的持久化存儲,
// 當前支持file和db的存儲铺韧,集群部署模式要使用db模式
SessionHolder.init(parameterParser.getStoreMode());
// 創(chuàng)建初始化DefaultCoordinator實例多矮,DefaultCoordinator是TC的核心事務邏輯處理類,
// 底層包含了AT哈打、TCC塔逃、SAGA等不同事務類型的邏輯處理。
DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer);
coordinator.init();
rpcServer.setHandler(coordinator);
// register ShutdownHook
ShutdownHook.getInstance().addDisposable(coordinator);
ShutdownHook.getInstance().addDisposable(rpcServer);
//127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(rpcServer.getListenPort());
try {
rpcServer.init();
} catch (Throwable e) {
LOGGER.error("rpcServer init error:{}", e.getMessage(), e);
System.exit(-1);
}
System.exit(0);
}
2. 解析配置
參數(shù)解析的實現(xiàn)代碼在ParameterParser類中料仗,init方法源碼如下:
private void init(String[] args) {
try {
// 判斷是否運行在容器中
boolean inContainer = this.isRunningInContainer();
if (inContainer) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("The server is running in container.");
}
// 如果是運行在容器中湾盗,則從環(huán)境變量中獲取啟動配置參數(shù)
this.seataEnv = StringUtils.trimToNull(System.getenv(ENV_SYSTEM_KEY));
this.host = StringUtils.trimToNull(System.getenv(ENV_SEATA_IP_KEY));
this.serverNode = NumberUtils.toLong(System.getenv(ENV_SERVER_NODE_KEY), SERVER_DEFAULT_NODE);
this.port = NumberUtils.toInt(System.getenv(ENV_SEATA_PORT_KEY), SERVER_DEFAULT_PORT);
this.storeMode = StringUtils.trimToNull(System.getenv(ENV_STORE_MODE_KEY));
} else {
// 基于JCommander獲取啟動應用程序時配置的參數(shù),
// JCommande通過注解罢维、反射的方式把參數(shù)賦值到當前類的字段上淹仑。
JCommander jCommander = JCommander.newBuilder().addObject(this).build();
jCommander.parse(args);
if (help) {
jCommander.setProgramName(PROGRAM_NAME);
jCommander.usage();
System.exit(0);
}
}
if (StringUtils.isNotBlank(seataEnv)) {
System.setProperty(ENV_PROPERTY_KEY, seataEnv);
}
if (StringUtils.isBlank(storeMode)) {
// 這里牽扯到一個重要的Configuration類,ParameterParser只負責獲取ip肺孵、port匀借、storeMode
// 等核心參數(shù),其他的參數(shù)都是從Configuration中獲取的平窘。
// 這里如果沒有啟動參數(shù)沒有指定storeMode吓肋,就從Configuration類中獲取。
storeMode = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_MODE,
SERVER_DEFAULT_STORE_MODE);
}
} catch (ParameterException e) {
printError(e);
}
}
在ParameterParser的init方法中第一次調(diào)用了ConfigurationFactory.getInstance()瑰艘,初始化了一個單例的Configuration對象是鬼,Configuration負責初始化所有的其他配置參數(shù)數(shù)據(jù)信息。配置文件中的file.conf紫新、registry.conf都是在這里被處理的均蜜。
ConfigurationFactory.getInstance方法其實就是獲取一個單例對象,核心在buildConfiguration方法中芒率,不過在buidlConfiguration方法前囤耳,ConfigurationFactory類有一段static代碼塊會先執(zhí)行。
public static Configuration getInstance() {
if (instance == null) {
synchronized (Configuration.class) {
if (instance == null) {
instance = buildConfiguration();
}
}
}
return instance;
}
ConfigurationFactory有static代碼塊,下面的代碼看起來很多充择,其實只是從registry.conf中讀取配置信息德玫。registry.conf中有兩個配置信息,注冊中心和配置源椎麦。registry.conf中指定其他配置項是file.conf或者是apollo等其他配置源)
static {
String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
if (null == seataConfigName) {
seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
}
if (null == seataConfigName) {
seataConfigName = REGISTRY_CONF_PREFIX;
}
String envValue = System.getProperty(ENV_PROPERTY_KEY);
if (null == envValue) {
envValue = System.getenv(ENV_SYSTEM_KEY);
}
Configuration configuration = (null == envValue)
? new FileConfiguration(seataConfigName + REGISTRY_CONF_SUFFIX,false)
: new FileConfiguration(seataConfigName + "-" + envValue + REGISTRY_CONF_SUFFIX, false);
Configuration extConfiguration = null;
try {
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class)
.provide(configuration);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("load Configuration:{}", extConfiguration == null
? configuration.getClass().getSimpleName()
: extConfiguration.getClass().getSimpleName());
}
} catch (EnhancedServiceNotFoundException ignore) {
} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
CURRENT_FILE_INSTANCE = null == extConfiguration ? configuration : extConfiguration;
}
ConfigurationFactory.buildConfiguration宰僧。buildConfiguration方法主要是根據(jù)registry.conf文件中配置的其他配置項的配置源來加載更多的配置項。當前的配置源已經(jīng)支持:file观挎、zk琴儿、apollo、nacos键兜、etcd3等凤类。
private static Configuration buildConfiguration() {
// 從registry中讀取其他配置項的配置源類型
ConfigType configType;
String configTypeName = null;
try {
configTypeName = CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);
if (StringUtils.isBlank(configTypeName)) {
throw new NotSupportYetException("config type can not be null");
}
configType = ConfigType.getType(configTypeName);
} catch (Exception e) {
throw e;
}
// 文件的配置源方式,默認讀取file.conf文件
if (ConfigType.File == configType) {
String pathDataId = String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR,
ConfigurationKeys.FILE_ROOT_CONFIG, FILE_TYPE, NAME_KEY);
String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);
Configuration configuration = new FileConfiguration(name);
Configuration extConfiguration = null;
try {
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class)
.provide(configuration);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("load Configuration:{}",
extConfiguration == null ? configuration.getClass().getSimpleName()
: extConfiguration.getClass().getSimpleName());
}
} catch (EnhancedServiceNotFoundException ignore) {
} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
return null == extConfiguration ? configuration : extConfiguration;
} else {
//通過SPI的方式加載其他配置源的實現(xiàn)類普气。在seata-server源代碼中可以看到
// 很多這樣通過單例和SPI的方式來獲取對象的場景谜疤。
return EnhancedServiceLoader.load(ConfigurationProvider.class,
Objects.requireNonNull(configType).name())
.provide();
}
}
3. 初始化UUIDGenerator
UUIDGenertor初始化接收一個serverNode參數(shù),UUIDGenertor當前是使用了雪花算法來生成唯一Id现诀,該serverNode用來保證多個seata-server實例生成的唯一id不重復夷磕。
public class UUIDGenerator {
/**
* Generate uuid long.
*
* @return the long
*/
public static long generateUUID() {
return IdWorker.getInstance().nextId();
}
/**
* Init.
*
* @param serverNode the server node id
*/
public static void init(Long serverNode) {
IdWorker.init(serverNode);
}
}
UUIDGenerator是對IdWorker做了封裝,唯一id的核心實現(xiàn)邏輯在IdWoker類中仔沿,IdWorker是一個雪花算法實現(xiàn)的坐桩。此處的IdWorker又是一個單例
public class IdWorker
/**
* Constructor
*
* @param workerId就是上面提到的ServerNode, 取值范圍在0·1023,也就是在64位的UUID中占10位
*/
public IdWorker(long workerId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(
String.format("worker Id can't be greater than %d or less than 0",
maxWorkerId));
}
this.workerId = workerId;
}
/**
* Get the next ID (the method is thread-safe)
*
* @return SnowflakeId
*/
public long nextId() {
long timestamp = timeGen();
if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format(
"clock moved backwards. Refusing to generate id for %d milliseconds",
lastTimestamp - timestamp));
}
synchronized (this) {
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}
lastTimestamp = timestamp;
}
// 雪花算法64位唯一id組成:
// 第一位0 + 41位時間戳 + 10位workerId + 12位自增序列化(同一時間戳內(nèi)自增)
return ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift)
| sequence;
}
4. SessionHolder初始化
SessionHolder負責Session的持久化封锉,一個Session對象對應一個事務绵跷,事務分為兩種:全局事務(GlobalSession)和分支事務(BranchSession)。
SessionHolder支持file和db兩種持久化方式成福,其中db支持集群模式碾局,推薦使用db。SessionHolder中最主要的四個字段如下:
// ROOT_SESSION_MANAGER用于獲取所有的Setssion奴艾,以及Session的創(chuàng)建净当、更新、刪除等蕴潦。
private static SessionManager ROOT_SESSION_MANAGER;
// 用于獲取像啼、更新所有的異步commit的Session
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于獲取、更新所有需要重試commit的Session
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于獲取潭苞、更新所有需要重試rollback的Session
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
SessionHolder的init方法
public static void init(String mode) throws IOException {
if (StringUtils.isBlank(mode)) {
mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
}
StoreMode storeMode = StoreMode.get(mode);
if (StoreMode.DB.equals(storeMode)) {
// 這里又用到了SPI的方式加載SessionManager忽冻,其實下面獲取的四個
// SessionManager實例都是同一個類DataBaseSessionManager的不同實例,
// 只是給DataBaseSessionManager的構造函數(shù)傳參不同此疹。
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
} else if (StoreMode.FILE.equals(storeMode)) {
//file模式可以先不關心
...
} else {
throw new IllegalArgumentException("unknown store mode:" + mode);
}
// reload方法對于db模式可以忽略
reload();
}
上面看到SessionHolder中的四個SessionManager本質(zhì)都是類DataBaseSessionManager的實例甚颂,只是給構造函數(shù)傳參不同蜜猾,看下DataBaseSessionManager的定義:
public DataBaseSessionManager(String name) {
super();
this.taskName = name;
}
// 根據(jù)實例的taskName來決定allSessions返回的事務列表,
// 如taskName等于ASYNC_COMMITTING_SESSION_MANAGER_NAME的就返回所有狀態(tài)為AsyncCommitting的事務振诬。
public Collection<GlobalSession> allSessions() {
// get by taskName
if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting));
} else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.CommitRetrying}));
} else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.RollbackRetrying,
GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying}));
} else {
// taskName為null,則對應ROOT_SESSION_MANAGER衍菱,即獲取所有狀態(tài)的事務
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {
GlobalStatus.UnKnown, GlobalStatus.Begin,
GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking,
GlobalStatus.RollbackRetrying,
GlobalStatus.TimeoutRollbacking,
GlobalStatus.TimeoutRollbackRetrying,
GlobalStatus.AsyncCommitting}));
}
}
5. 初始化DefaultCoordinator
DefaultCoordinator是事務協(xié)調(diào)器的核心赶么,如:開啟、提交脊串、回滾全局事務辫呻,注冊、提交琼锋、回滾分支事務都是由DefaultCoordinator負責協(xié)調(diào)處理的放闺。DefaultCoordinato通過RpcServer與遠程的TM、RM通信來實現(xiàn)分支事務的提交缕坎、回滾等怖侦。
public DefaultCoordinator(ServerMessageSender messageSender) {
// 接口messageSender的實現(xiàn)類就是上文提到的RpcServer
this.messageSender = messageSender;
// DefaultCore封裝了AT、TCC谜叹、Saga等分布式事務模式的具體實現(xiàn)類
this.core = new DefaultCore(messageSender);
}
//init方法初始化了5個定時器匾寝,主要用于分布式事務的重試機制,
// 因為分布式環(huán)境的不穩(wěn)定性會造成事務處于中間狀態(tài)荷腊,
// 所以要通過不斷的重試機制來實現(xiàn)事務的最終一致性艳悔。
// 下面的定時器除了undoLogDelete之外,其他的定時任務默認都是1秒執(zhí)行一次女仰。
public void init() {
//處理處于回滾狀態(tài)可重試的事務
retryRollbacking.scheduleAtFixedRate(() -> {
try {
handleRetryRollbacking();
} catch (Exception e) {
LOGGER.info("Exception retry rollbacking ... ", e);
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//處理二階段可以重試提交的狀態(tài)可重試的事務
retryCommitting.scheduleAtFixedRate(() -> {
try {
handleRetryCommitting();
} catch (Exception e) {
LOGGER.info("Exception retry committing ... ", e);
}
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//處理異步提交的事務
asyncCommitting.scheduleAtFixedRate(() -> {
try {
handleAsyncCommitting();
} catch (Exception e) {
LOGGER.info("Exception async committing ... ", e);
}
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//檢查事務的第一階段已經(jīng)超時的事務猜年,設置為TimeoutRollbacking,
// 由其他定時任務執(zhí)行回滾操作
timeoutCheck.scheduleAtFixedRate(() -> {
try {
timeoutCheck();
} catch (Exception e) {
LOGGER.info("Exception timeout checking ... ", e);
}
}, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
// 根據(jù)unlog的保存天數(shù)調(diào)用RM刪除unlog
undoLogDelete.scheduleAtFixedRate(() -> {
try {
undoLogDelete();
} catch (Exception e) {
LOGGER.info("Exception undoLog deleting ... ", e);
}
}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}
6. 初始化RpcServer
RpcServer是基于Netty實現(xiàn)的簡化版的Rpc服務端疾忍,RpcServer初始化時主要做了兩件事:
- 初始化Netty乔外,設置ChannelHandler,啟動Netty
- 把當前實例的IP端口注冊到注冊中心中(根據(jù)registry中的注冊中心類型以及地址配置注冊)
public void init() {
// 響應Rpc客戶端的邏輯處理
DefaultServerMessageListenerImpl defaultServerMessageListenerImpl =
new DefaultServerMessageListenerImpl(getTransactionMessageHandler());
defaultServerMessageListenerImpl.init();
defaultServerMessageListenerImpl.setServerMessageSender(this);
super.setServerMessageListener(defaultServerMessageListenerImpl);
super.setChannelHandlers(new ServerHandler());
super.init();
}
@Override
public void init() {
super.init();
// 調(diào)用Netty初始化邏輯
serverBootstrap.start();
}
// Netty初始化邏輯
public void start() {
//netty初始化邏輯
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(listenPort))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig
.getChannelMaxReadIdleSeconds(), 0, 0))
.addLast(new ProtocolV1Decoder())
.addLast(new ProtocolV1Encoder());
if (null != channelHandlers) {
addChannelPipelineLast(ch, channelHandlers);
}
}
});
try {
ChannelFuture future = this.serverBootstrap.bind(listenPort).sync();
LOGGER.info("Server started ... ");
//向注冊中心注冊當前實例
RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
initialized.set(true);
future.channel().closeFuture().sync();
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
本文由博客一文多發(fā)平臺 OpenWrite 發(fā)布锭碳!