開(kāi)篇
?這篇文章是用來(lái)講解清楚TC(Transaction Coordinator:事務(wù)協(xié)調(diào)器)在處理TM發(fā)送過(guò)來(lái)的begin操作(事務(wù)開(kāi)啟操作)。
?核心邏輯包括GlobalSession對(duì)象的生成、GlobalSession的持久化以及XID生成愁茁。另外一個(gè)重點(diǎn)是GlobalSession的狀態(tài)初始化為GlobalStatus.Begin。
TC begin 流程說(shuō)明
1.創(chuàng)建GlobalSession對(duì)象熬词,GlobalSession.createGlobalSession()惫搏。
2.添加周期監(jiān)聽(tīng)器到GlobalSession當(dāng)中剥哑,生命周期對(duì)象為DefaultSessionManager笤妙。
3.啟動(dòng)GlobalSession的周期監(jiān)聽(tīng)器冒掌,添加GlobalSession對(duì)象到全局sessionMap對(duì)象。
4.啟動(dòng)GlobalSession的周期監(jiān)聽(tīng)器蹲盘,持久化GlobalSession對(duì)象股毫。
TC begin 源碼分析
public class DefaultCore implements Core {
@Override
public String begin(String applicationId, String transactionServiceGroup,
String name, int timeout)
throws TransactionException {
// 創(chuàng)建全局GlobalSession對(duì)象
GlobalSession session = GlobalSession.createGlobalSession(
applicationId, transactionServiceGroup, name, timeout);
// 全局GlobalSession對(duì)象添加生命周期監(jiān)聽(tīng)器SessionHolder.getRootSessionManager()
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 啟動(dòng)全局Session對(duì)象GlobalSession
session.begin();
// 返回新生成的XID返回
return XID.generateXID(session.getTransactionId());
}
}
說(shuō)明:
GlobalSession.createGlobalSession()創(chuàng)建全局GlobalSession對(duì)象。
session.addSessionLifecycleListener()給GlobalSession對(duì)象添加生命周期監(jiān)聽(tīng)器召衔。
session.begin()方法通過(guò)生命周期監(jiān)聽(tīng)器保存全局GlobalSession對(duì)象皇拣。
sessionHolder.getRootSessionManager()返回DefaultSessionManager對(duì)象。
XID.generateXID()創(chuàng)建XID值薄嫡。
public class GlobalSession implements SessionLifecycle, SessionStorable {
// 生命周期監(jiān)聽(tīng)器的容器
private ArrayList<SessionLifecycleListener> lifecycleListeners
= new ArrayList<>();
public static GlobalSession createGlobalSession(String applicationId,
String txServiceGroup, String txName, int timeout) {
GlobalSession session =
new GlobalSession(applicationId, txServiceGroup, txName, timeout);
return session;
}
public GlobalSession(String applicationId, String transactionServiceGroup,
String transactionName, int timeout) {
// 生成transactionId對(duì)象。
this.transactionId = UUIDGenerator.generateUUID();
this.status = GlobalStatus.Begin;
this.applicationId = applicationId;
this.transactionServiceGroup = transactionServiceGroup;
this.transactionName = transactionName;
this.timeout = timeout;
}
// 添加生命周期監(jiān)聽(tīng)器
public void addSessionLifecycleListener(
SessionLifecycleListener sessionLifecycleListener) {
lifecycleListeners.add(sessionLifecycleListener);
}
public void begin() throws TransactionException {
this.status = GlobalStatus.Begin;
this.beginTime = System.currentTimeMillis();
this.active = true;
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onBegin(this);
}
}
}
// 生成TransactionId的類(lèi)和方法
public class UUIDGenerator {
private static AtomicLong UUID = new AtomicLong(1000);
private static int UUID_INTERNAL = 200000000;
public static long generateUUID() {
long id = UUID.incrementAndGet();
if (id > 2000000000) {
synchronized (UUID) {
if (UUID.get() >= id) {
id -= 2000000000;
UUID.set(id);
}
}
}
return id;
}
}
說(shuō)明:
GlobalSession構(gòu)造器內(nèi)部通過(guò)UUIDGenerator.generateUUID()生成transactionId颗胡。
addSessionLifecycleListener()方法添加生命周期監(jiān)聽(tīng)器DefaultSessionManager毫深。
begin()方法調(diào)用生命周期監(jiān)聽(tīng)器的onBegin()方法(lifecycleListener.onBegin),實(shí)現(xiàn)GlobalSession的持久化毒姨。
public class SessionHolder {
private static final String ROOT_SESSION_MANAGER_NAME = "root.data";
private static final String ASYNC_COMMITTING_SESSION_MANAGER_NAME = "async.commit.data";
private static final String RETRY_COMMITTING_SESSION_MANAGER_NAME = "retry.commit.data";
private static final String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = "retry.rollback.data";
private static SessionManager ROOT_SESSION_MANAGER;
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
public static void init(String sessionStorePath) throws IOException {
if (sessionStorePath == null) {
ROOT_SESSION_MANAGER = new DefaultSessionManager(ROOT_SESSION_MANAGER_NAME);
ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
} else {
if (!sessionStorePath.endsWith("/")) {
sessionStorePath = sessionStorePath + "/";
}
ROOT_SESSION_MANAGER = new FileBasedSessionManager(ROOT_SESSION_MANAGER_NAME, sessionStorePath);
ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
}
}
public static final SessionManager getRootSessionManager() {
if (ROOT_SESSION_MANAGER == null) {
throw new ShouldNeverHappenException("SessionManager is NOT init!");
}
return ROOT_SESSION_MANAGER;
}
}
說(shuō)明:
- getRootSessionManager()返回DefaultSessionManager對(duì)象哑蔫,實(shí)現(xiàn)生命周期接口。
public class DefaultSessionManager extends AbstractSessionManager {
public DefaultSessionManager(String name) {
super(name);
transactionStoreManager = new TransactionStoreManager() {
@Override
public boolean writeSession(LogOperation logOperation,
SessionStorable session) {
return false;
}
@Override
public void shutdown() {
}
@Override
public List<TransactionWriteStore> readWriteStoreFromFile(int readSize,
boolean isHistory) {
return null;
}
@Override
public boolean hasRemaining(boolean isHistory) {
return false;
}
};
}
}
public abstract class AbstractSessionManager
implements SessionManager, SessionLifecycleListener {
protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSessionManager.class);
protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();
protected TransactionStoreManager transactionStoreManager;
protected String name;
public AbstractSessionManager(String name) {
this.name = name;
}
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
}
transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
sessionMap.put(session.getTransactionId(), session);
}
@Override
public void onBegin(GlobalSession globalSession) throws TransactionException {
addGlobalSession(globalSession);
}
}
說(shuō)明:
- DefaultSessionManager是GlobalSession的生命周期管理器。
- DefaultSessionManager的父類(lèi)AbstractSessionManager實(shí)現(xiàn)SessionLifecycleListener接口闸迷。
- DefaultSessionManager的調(diào)用父類(lèi)AbstractSessionManager的onBegin()方法嵌纲。
- onBegin()方法內(nèi)部執(zhí)行addGlobalSession()方法添加GlobalSession對(duì)象。
- addGlobalSession()方法執(zhí)行transactionStoreManager.writeSession()執(zhí)行持久化腥沽,自定義的TransactionStoreManager啥都不操作逮走。
- transactionStoreManager是DefaultSessionManager內(nèi)生成TransactionStoreManager對(duì)象。
- addGlobalSession()方法執(zhí)行sessionMap.put()保存GlobalSession對(duì)象今阳。