* 最后我们会进入`com.arjuna.ats.arjuna.coordinator.BasicAction.End()`方法,会首先判断是否能优化成一阶段提交,否则进行二阶段提交(二阶段提交还可以使用异步线程池方式)。代码如下:
if (doOnePhase())
{
onePhaseCommit(reportHeuristics);
ActionManager.manager().remove(get_uid());
}
else
{
int prepareStatus = prepare(reportHeuristics);
if (prepareStatus == TwoPhaseOutcome.PREPARE_NOTOK
|| prepareStatus == TwoPhaseOutcome.ONE_PHASE_ERROR) {
tsLogger.i18NLogger.warn_coordinator_BasicAction_36(get_uid());
if (heuristicDecision != TwoPhaseOutcome.PREPARE_OK) {
tsLogger.i18NLogger.warn_coordinator_BasicAction_37(TwoPhaseOutcome.stringForm(heuristicDecision));
}
tsLogger.i18NLogger.warn_coordinator_BasicAction_38();
if (!reportHeuristics && TxControl.asyncCommit
&& (parentAction == null)) {
TwoPhaseCommitThreadPool.submitJob(new AsyncCommit(this, false));
} else
phase2Abort(reportHeuristics); /* first phase failed */
}
else
{
if (!reportHeuristics && TxControl.asyncCommit
&& (parentAction == null))
{
TwoPhaseCommitThreadPool.submitJob(new AsyncCommit(this, true));
}
else
phase2Commit(reportHeuristics); /* first phase succeeded */
}
}
}
### **一阶段提交**
进入方法 `onePhaseCommit`, 最后会调用`com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelOnePhaseCommit()`。该方法首先会发起 XA end 语句,然后再执行XA commit语句。代码如下:
//省略相关代码
//执行XA end语句
endAssociation(XAResource.TMSUCCESS, TxInfo.NOT_ASSOCIATED);
//执行XA commit
_theXAResource.commit(_tranID, true);
### **二阶段提交**
* 首先会进行进入 `prepare(reportHeuristics);`, 最后会调用`com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelPrepare()`该方法首先会执行 XA end 语句,然后执行 XA prepare语句。代码如下:
//省略相关代码
//执行XA end语句
endAssociation(XAResource.TMSUCCESS, TxInfo.NOT_ASSOCIATED);
//执行XA prepare
theXAResource.prepare(_tranID)
* 接下来进行提交,进入方法 `phase2Commit`, 最后会调用`com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelCommit()`。该方法会执行XA commit语句。代码如下:
//省略相关代码
//执行XA commit
_theXAResource.commit(_tranID, fase);
### **Narayana 回滚流程**
首先我们先切换回`org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager.rollback()` 方法,然后会进入 `com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.rollback()` 方法,代码如下:
public void rollback() throws java.lang.IllegalStateException,
java.lang.SecurityException, javax.transaction.SystemException
{
if (jtaLogger.logger.isTraceEnabled()) {
jtaLogger.logger.trace(“BaseTransaction.rollback”);
}
TransactionImple theTransaction = TransactionImple.getTransaction();
if (theTransaction == null)
throw new IllegalStateException(
"BaseTransaction.rollback - "
+ jtaLogger.i18NLogger.get_transaction_arjunacore_notx());
theTransaction.rollbackAndDisassociate();
}
* 代码最后后进入`com.arjuna.ats.arjuna.coordinator.BasicAction.topLevelAbort()`。代码如下:
//省略代码
//先执行XA end 语句
endAssociation(XAResource.TMFAIL, TxInfo.FAILED);
//然后执行XA rollback
_theXAResource.rollback(_tranID);
* 接下来就是清除换成,清除事务日志。代码如下:
ActionManager.manager().remove(get_uid());
actionStatus = ActionStatus.ABORTED;
if (TxStats.enabled()) {
TxStats.getInstance().incrementAbortedTransactions();
if (applicationAbort)
TxStats.getInstance().incrementApplicationRollbacks();
}
总结:可以看到回滚流程会稍微毕竟简单。先执行XA end语句,然后执行XA rollback语句。
文章到此,已经写的很长很多了,我们分析了ShardingSphere对于XA方案,提供了一套SPI解决方案,对Narayana进行了整合,也分析了Narayana初始化流程,开始事务流程,获取连接流程,提交事务流程,回滚事务流程。下一篇文章,我们来详解narayana的事务恢复流程。
**关于我们**
Apache ShardingSphere是一套开源的分布式数据库中间件解决方案组成的生态圈,它由Sharding-JDBC、Sharding-Proxy和Sharding-Sidecar(规划中)这3款相互独立的产品组成。他们均提供标准化的数据分片、分布式事务、数据迁移、数据库治理和管控界面功能,可适用于如Java同构、异构语言、容器、云原生等各种多样化的应用场景。
Apache ShardingSphere不断践行Apache Way,致力于打造充满活力、规范、互助的社区!开源路上,我们欢迎你的加入。
作者介绍: 肖宇,Apache ShardingSphere Committer,开源hmily分布式事务框架作者,
开源soul网关作者,热爱开源,追求写优雅代码。目前就职入京东数科,参与ShardingSphere的开源建设,以及分布式数据库的研发工作。
感谢支持 OpenSEC