查看原文
其他

数据库中间件 Sharding-JDBC 源码分析 —— 事务(一)之BED

2017-11-06 芋艿 芋道源码

摘要: 原创出处 http://www.iocoder.cn/Sharding-JDBC/transaction-bed/ 「芋道源码」欢迎转载,保留摘要,谢谢!

微信排版崩崩的,建议使用 PC 点击【阅读原文】。

本文主要基于 Sharding-JDBC 1.5.0 正式版

  • 1. 概述

  • 2. 最大努力送达型

  • 3. 柔性事务管理器

    • 3.3.1 创建柔性事务

    • 3.1 概念

    • 3.2 柔性事务配置

    • 3.3 柔性事务

  • 4. 事务日志存储器

    • 4.1 #add()

    • 4.2 #remove()

    • 4.3 #findEligibleTransactionLogs()

    • 4.4 #increaseAsyncDeliveryTryTimes()

    • 4.5 #processData()

  • 5. 最大努力送达型事务监听器

  • 6. 最大努力送达型异步作业

    • 6.1 BestEffortsDeliveryJob

    • 6.2 AsyncSoftTransactionJobConfiguration

    • 6.3 Elastic-Job 是否必须?

  • 7. 适用场景

  • 8. 开发指南 & 开发示例

  • 666. 彩蛋

🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表

  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址

  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢

  4. 新的源码解析文章实时收到通知。每周更新一篇左右

  5. 认真的源码交流微信群。


1. 概述

数据库表分库后,业务场景下的单库本地事务可能变成跨库分布式事务。虽然我们可以通过合适的分库规则让操作的数据在同库下,继续保证单库本地事务,这也是非常推崇的,但不是所有场景下都能适用。如果这些场景对事务的一致性有要求,我们就不得不解决分布式事务的“麻烦”。

分布式事务是个很大的话题,我们来看看 Sharding-JDBC 对她的权衡:

Sharding-JDBC由于性能方面的考量,决定不支持强一致性分布式事务。我们已明确规划线路图,未来会支持最终一致性的柔性事务。

Sharding-JDBC 提供了两种 柔性事务

  • 最大努力送达型 BED :已经实现

  • 事务补偿型 TCC :计划中

本文分享 最大努力送达型 的实现。建议前置阅读:《Sharding-JDBC 源码分析 —— SQL 执行》。

Sharding-JDBC 正在收集使用公司名单:传送门。
🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门
Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门
登记吧,骚年!传送门

2. 最大努力送达型

概念

在分布式数据库的场景下,相信对于该数据库的操作最终一定可以成功,所以通过最大努力反复尝试送达操作。

从概念看,可能不是很直白的理解是什么意思,本文会最大努力让你干净理解。

架构图

执行过程有 四种 情况:

  1. 【红线】执行成功

  2. 【棕线】执行失败,同步重试成功

  3. 【粉线】执行失败,同步重试失败,异步重试成功

  4. 【绿线】执行失败,同步重试失败,异步重试失败,事务日志保留

整体成漏斗倒三角,上一个阶段失败,交给下一个阶段重试:

整个过程通过如下 组件 完成:

  • 柔性事务管理器

  • 最大努力送达型柔性事务

  • 最大努力送达型事务监听器

  • 事务日志存储器

  • 最大努力送达型异步作业

下面,我们逐节分享每个组件。

3. 柔性事务管理器

3.1 概念

柔性事务管理器,SoftTransactionManager 实现,负责对柔性事务配置( SoftTransactionConfiguration ) 、柔性事务( AbstractSoftTransaction )的管理。

3.2 柔性事务配置

调用 #init() 初始化柔性管理器:

  1. // SoftTransactionManager.java

  2. /**

  3. * 柔性事务配置对象

  4. */

  5. @Getter

  6. private final SoftTransactionConfiguration transactionConfig;  

  7. // SoftTransactionManager.java

  8. /**

  9. * 初始化事务管理器.

  10. */

  11. public void init() throws SQLException {

  12.   // 初始化 最大努力送达型事务监听器

  13.   EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());

  14.   // 初始化 事务日志数据库存储表

  15.   if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {

  16.       Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource());

  17.       createTable();

  18.   }

  19.   // 初始化 内嵌的最大努力送达型异步作业

  20.   if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) {

  21.       new NestedBestEffortsDeliveryJobFactory(transactionConfig).init();

  22.   }

  23. }

  • 将最大努力送达型事务监听器( BestEffortsDeliveryListener )注册到事务总线 ( EventBus )。在『最大努力送达型事务监听器』小节会详细分享

  • 当使用数据库存储事务日志( TransactionLog ) 时,若事务日志表( transaction_log )不存在则进行创建。在『事务日志存储器』小节会详细分享

  • 当配置使用内嵌的最大努力送达型异步作业( NestedBestEffortsDeliveryJob ) 时,进行初始化。在『最大努力送达型异步作业』小节会详细分享

SoftTransactionConfiguration

SoftTransactionConfiguration,柔性事务配置对象。

  1. public class SoftTransactionConfiguration {

  2.    /**

  3.     * 事务管理器管理的数据源.

  4.     */

  5.    @Getter(AccessLevel.NONE)

  6.    private final DataSource targetDataSource;

  7.    /**

  8.     * 同步的事务送达的最大尝试次数.

  9.     */

  10.    private int syncMaxDeliveryTryTimes = 3;

  11.    /**

  12.     * 事务日志存储类型.

  13.     */

  14.    private TransactionLogDataSourceType storageType = RDB;

  15.    /**

  16.     * 存储事务日志的数据源.

  17.     */

  18.    private DataSource transactionLogDataSource;

  19.    /**

  20.     * 内嵌的最大努力送达型异步作业配置对象.

  21.     */

  22.    private Optional<NestedBestEffortsDeliveryJobConfiguration> bestEffortsDeliveryJobConfiguration = Optional.absent();

  23. }

3.3 柔性事务

在 Sharding-JDBC 里,目前柔性事务分成两种:

  • BEDSoftTransaction :最大努力送达型柔性事务

  • TCCSoftTransaction :TCC型柔性事务

继承 AbstractSoftTransaction

  1. public abstract class AbstractSoftTransaction {

  2.    /**

  3.     * 分片连接原自动提交状态

  4.     */

  5.    private boolean previousAutoCommit;

  6.    /**

  7.     * 分片连接

  8.     */

  9.    @Getter

  10.    private ShardingConnection connection;

  11.    /**

  12.     * 事务类型

  13.     */

  14.    @Getter

  15.    private SoftTransactionType transactionType;

  16.    /**

  17.     * 事务编号

  18.     */

  19.    @Getter

  20.    private String transactionId;

  21. }

AbstractSoftTransaction 实现了开启柔性事务、关闭柔性事务两个方法提供给子类调用:

  • #beginInternal()

    • 对异常处理的代码:ExecutorExceptionHandler#setExceptionThrown()

    • 对于其他 SQL,不会因为 SQL 错误不执行,会继续执行

    • 对于上层业务,不会因为 SQL 错误终止逻辑,会继续执行。这里有一点要注意下,上层业务不能对该 SQL 执行结果有强依赖,因为 SQL 错误需要重试达到数据最终一致性

    • 对于最大努力型事务( TCC暂未实现 ),会对执行错误的 SQL 进行重试

    • 调用 ExecutorExceptionHandler.setExceptionThrown(false) 设置执行 SQL 错误时,也不抛出异常。

    • 调用 connection.setAutoCommit(true);,设置执行自动提交。使用最大努力型事务时,上层业务执行 SQL 会马上提交,即使调用 Connection#rollback() 也是无法回滚的,这点一定要注意。

  1. /**

  2. * 开启柔性

  3. *

  4. * @param conn 分片连接

  5. * @param type 事务类型

  6. * @throws SQLException

  7. */

  8. protected final void beginInternal(final Connection conn, final SoftTransactionType type) throws SQLException {

  9.   // TODO 判断如果在传统事务中,则抛异常

  10.   Preconditions.checkArgument(conn instanceof ShardingConnection, "Only ShardingConnection can support eventual consistency transaction.");

  11.   // 设置执行错误,不抛出异常

  12.   ExecutorExceptionHandler.setExceptionThrown(false);

  13.   connection = (ShardingConnection) conn;

  14.   transactionType = type;

  15.   // 设置自动提交状态

  16.   previousAutoCommit = connection.getAutoCommit();

  17.   connection.setAutoCommit(true);

  18.   // 生成事务编号

  19.   // TODO 替换UUID为更有效率的id生成器

  20.   transactionId = UUID.randomUUID().toString();

  21. }

  • #end()

    • 事务结束后,一定要记得调用 #end() 清理线程变量。否则,下次请求使用到该线程,会继续在这个柔性事务内。

    1. /**

    2. * 结束柔性事务.

    3. */

    4. public final void end() throws SQLException {

    5.  if (connection != null) {

    6.      ExecutorExceptionHandler.setExceptionThrown(true);

    7.      connection.setAutoCommit(previousAutoCommit);

    8.      SoftTransactionManager.closeCurrentTransactionManager();

    9.  }

    10. }

    11. // SoftTransactionManager.java

    12. /**

    13. * 关闭当前的柔性事务管理器.

    14. */

    15. static void closeCurrentTransactionManager() {

    16.   ExecutorDataMap.getDataMap().put(TRANSACTION, null);

    17.   ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, null);

    18. }

    BEDSoftTransaction

    BEDSoftTransaction,最大努力送达型柔性事务。

    1. public class BEDSoftTransaction extends AbstractSoftTransaction {

    2.    /**

    3.     * 开启柔性事务.

    4.     *

    5.     * @param connection 数据库连接对象

    6.     */

    7.    public void begin(final Connection connection) throws SQLException {

    8.        beginInternal(connection, SoftTransactionType.BestEffortsDelivery);

    9.    }

    10. }

    TCCSoftTransaction

    TCCSoftTransaction,TCC 型柔性事务,暂未实现。实现后,会更新到 《Sharding-JDBC 源码分析 —— 分布式事务(二)之事务补偿型》。


    3.3.1 创建柔性事务

    通过调用 SoftTransactionManager#getTransaction() 创建柔性事务对象:

    1. /**

    2. * {@link ExecutorDataMap#dataMap} 柔性事务对象 key

    3. */

    4. private static final String TRANSACTION = "transaction";

    5. /**

    6. * {@link ExecutorDataMap#dataMap} 柔性事务配置 key

    7. */

    8. private static final String TRANSACTION_CONFIG = "transactionConfig";

    9. // SoftTransactionManager.java

    10. /**

    11. * 创建柔性事务.

    12. *

    13. * @param type 柔性事务类型

    14. * @return 柔性事务

    15. */

    16. public AbstractSoftTransaction getTransaction(final SoftTransactionType type) {

    17.   AbstractSoftTransaction result;

    18.   switch (type) {

    19.       case BestEffortsDelivery:

    20.           result = new BEDSoftTransaction();

    21.           break;

    22.       case TryConfirmCancel:

    23.           result = new TCCSoftTransaction();

    24.           break;

    25.       default:

    26.           throw new UnsupportedOperationException(type.toString());

    27.   }

    28.   // TODO 目前使用不支持嵌套事务,以后这里需要可配置

    29.   if (getCurrentTransaction().isPresent()) {

    30.       throw new UnsupportedOperationException("Cannot support nested transaction.");

    31.   }

    32.   ExecutorDataMap.getDataMap().put(TRANSACTION, result);

    33.   ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig);

    34.   return result;

    35. }

    • 后续可以从 ExecutorDataMap 中获取当前线程的柔性事务和柔性事务配置:

    1. // SoftTransactionManager.java

    2. /**

    3. * 获取当前线程的柔性事务配置.

    4. *

    5. * @return 当前线程的柔性事务配置

    6. */

    7. public static Optional<SoftTransactionConfiguration> getCurrentTransactionConfiguration() {

    8.   Object transactionConfig = ExecutorDataMap.getDataMap().get(TRANSACTION_CONFIG);

    9.   return (null == transactionConfig)

    10.           ? Optional.<SoftTransactionConfiguration>absent()

    11.           : Optional.of((SoftTransactionConfiguration) transactionConfig);

    12. }

    13. /**

    14. * 获取当前的柔性事务.

    15. *

    16. * @return 当前的柔性事务

    17. */

    18. public static Optional<AbstractSoftTransaction> getCurrentTransaction() {

    19.   Object transaction = ExecutorDataMap.getDataMap().get(TRANSACTION);

    20.   return (null == transaction)

    21.           ? Optional.<AbstractSoftTransaction>absent()

    22.           : Optional.of((AbstractSoftTransaction) transaction);

    23. }

    4. 事务日志存储器

    柔性事务执行过程中,会通过事务日志( TransactionLog ) 记录每条 SQL 执行状态:

    • SQL 执行前,记录一条事务日志

    • SQL 执行成功,移除对应的事务日志

    通过实现事务日志存储器接口( TransactionLogStorage ),提供存储功能。目前有两种实现:

    • MemoryTransactionLogStorage :基于内存的事务日志存储器。主要用于开发测试,生产环境下不要使用

    • RdbTransactionLogStorage :基于数据库的事务日志存储器。

    本节只分析 RdbTransactionLogStorage。对 MemoryTransactionLogStorage 感兴趣的同学可以点击链接传送到达。

    TransactionLogStorage 有五个接口方法,下文每个小标题都是一个方法。

    4.1 #add()

    1. // TransactionLogStorage.java

    2. /**

    3. * 存储事务日志.

    4. *

    5. * @param transactionLog 事务日志

    6. */

    7. void add(TransactionLog transactionLog);

    8. // RdbTransactionLogStorage.java

    9. @Override

    10. public void add(final TransactionLog transactionLog) {

    11.   String sql = "INSERT INTO `transaction_log` (`id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?);";

    12.   try (

    13.    // ... 省略你熟悉的代码

    14.   } catch (final SQLException ex) {

    15.       throw new TransactionLogStorageException(ex);

    16.   }

    17. }

    • 注意:如果插入事务日志失败,SQL 会继续执行,如果此时 SQL 执行失败,则该 SQL 会不见了。建议: #add() 和下文的 #remove() 异常时,都打印下异常日志都文件系统

    TransactionLog (transaction_log) 数据库表结构如下:

    字段名字数据库类型备注
    id事件编号VARCHAR(40)EventBus 事件编号,非事务编号
    transaction_type柔性事务类型VARCHAR(30)
    data_source真实数据源名VARCHAR(255)
    sql执行 SQLTEXT已经改写过的 SQL
    parameters占位符参数TEXTJSON 字符串存储
    creation_time记录时间LONG
    asyncdeliverytry_times已异步重试次数INT

    4.2 #remove()

    1. // TransactionLogStorage.java

    2. /**

    3. * 根据主键删除事务日志.

    4. *

    5. * @param id 事务日志主键

    6. */

    7. void remove(String id);

    8. // RdbTransactionLogStorage.java    

    9. @Override

    10. public void remove(final String id) {

    11.   String sql = "DELETE FROM `transaction_log` WHERE `id`=?;";

    12.   try (

    13.          // ... 省略你熟悉的代码

    14.   } catch (final SQLException ex) {

    15.       throw new TransactionLogStorageException(ex);

    16.   }

    17. }

    4.3 #findEligibleTransactionLogs()

    1. // TransactionLogStorage.java

    2. /**

    3. * 读取需要处理的事务日志.

    4. *

    5. * <p>需要处理的事务日志为: </p>

    6. * <p>1. 异步处理次数小于最大处理次数.</p>

    7. * <p>2. 异步处理的事务日志早于异步处理的间隔时间.</p>

    8. *

    9. * @param size 获取日志的数量

    10. * @param maxDeliveryTryTimes 事务送达的最大尝试次数

    11. * @param maxDeliveryTryDelayMillis 执行送达事务的延迟毫秒数.

    12. */

    13. List<TransactionLog> findEligibleTransactionLogs(int size, int maxDeliveryTryTimes, long maxDeliveryTryDelayMillis);

    14. // RdbTransactionLogStorage.java

    15. @Override

    16. public List<TransactionLog> findEligibleTransactionLogs(final int size, final int maxDeliveryTryTimes, final long maxDeliveryTryDelayMillis) {

    17.   List<TransactionLog> result = new ArrayList<>(size);

    18.   String sql = "SELECT `id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`, `async_delivery_try_times` "

    19.       + "FROM `transaction_log` WHERE `async_delivery_try_times`<? AND `transaction_type`=? AND `creation_time`<? LIMIT ?;";

    20.   try (Connection conn = dataSource.getConnection()) {

    21.       // ... 省略你熟悉的代码

    22.   } catch (final SQLException ex) {

    23.       throw new TransactionLogStorageException(ex);

    24.   }

    25.   return result;

    26. }

    4.4 #increaseAsyncDeliveryTryTimes()

    1. // TransactionLogStorage.java

    2. /**

    3. * 增加事务日志异步重试次数.

    4. *

    5. * @param id 事务主键

    6. */

    7. void increaseAsyncDeliveryTryTimes(String id);

    8. // RdbTransactionLogStorage.java

    9. @Override

    10. public void increaseAsyncDeliveryTryTimes(final String id) {

    11.   String sql = "UPDATE `transaction_log` SET `async_delivery_try_times`=`async_delivery_try_times`+1 WHERE `id`=?;";

    12.   try (

    13.       // ... 省略你熟悉的代码

    14.   } catch (final SQLException ex) {

    15.       throw new TransactionLogStorageException(ex);

    16.   }

    17. }

    4.5 #processData()

    1. // TransactionLogStorage.java

    2. /**

    3. * 处理事务数据.

    4. *

    5. * @param connection 业务数据库连接

    6. * @param transactionLog 事务日志

    7. * @param maxDeliveryTryTimes 事务送达的最大尝试次数

    8. */

    9. boolean processData(Connection connection, TransactionLog transactionLog, int maxDeliveryTryTimes);

    10. // RdbTransactionLogStorage.java

    11. @Override

    12. public boolean processData(final Connection connection, final TransactionLog transactionLog, final int maxDeliveryTryTimes) {

    13.   // 重试执行失败 SQL

    14.   try (

    15.       Connection conn = connection;

    16.       PreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())) {

    17.       for (int parameterIndex = 0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++) {

    18.           preparedStatement.setObject(parameterIndex + 1, transactionLog.getParameters().get(parameterIndex));

    19.       }

    20.       preparedStatement.executeUpdate();

    21.   } catch (final SQLException ex) {

    22.       // 重试失败,更新事务日志,增加已异步重试次数

    23.       increaseAsyncDeliveryTryTimes(transactionLog.getId());

    24.       throw new TransactionCompensationException(ex);

    25.   }

    26.   // 移除重试执行成功 SQL 对应的事务日志

    27.   remove(transactionLog.getId());

    28.   return true;

    29. }

    • 不同于前四个增删改查接口方法的实现, #processData() 是带有一些逻辑的。根据事务日志( TransactionLog )重试执行失败的 SQL,若成功,移除事务日志;若失败,更新事务日志,增加已异步重试次数

    • 该方法会被最大努力送达型异步作业调用到

    5. 最大努力送达型事务监听器

    最大努力送达型事务监听器,BestEffortsDeliveryListener,负责记录事务日志、同步重试执行失败 SQL。

    1. // BestEffortsDeliveryListener.java

    2. @Subscribe

    3. @AllowConcurrentEvents

    4. public void listen(final DMLExecutionEvent event) {

    5.   if (!isProcessContinuously()) {

    6.       return;

    7.   }

    8.   SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();

    9.   TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());

    10.   BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();

    11.   switch (event.getEventExecutionType()) {

    12.       case BEFORE_EXECUTE: // 执行前,插入事务日志

    13.           //TODO 对于批量执行的SQL需要解析成两层列表

    14.           transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),

    15.                   event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));

    16.           return;

    17.       case EXECUTE_SUCCESS: // 执行成功,移除事务日志

    18.           transactionLogStorage.remove(event.getId());

    19.           return;

    20.       case EXECUTE_FAILURE: // 执行失败,同步重试

    21.           boolean deliverySuccess = false;

    22.           for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) { // 同步【多次】重试

    23.               if (deliverySuccess) {

    24.                   return;

    25.               }

    26.               boolean isNewConnection = false;

    27.               Connection conn = null;

    28.               PreparedStatement preparedStatement = null;

    29.               try {

    30.                   // 获得数据库连接

    31.                   conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);

    32.                   if (!isValidConnection(conn)) { // 因为可能执行失败是数据库连接异常,所以判断一次,如果无效,重新获取数据库连接

    33.                       bedSoftTransaction.getConnection().release(conn);

    34.                       conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);

    35.                       isNewConnection = true;

    36.                   }

    37.                   preparedStatement = conn.prepareStatement(event.getSql());

    38.                   // 同步重试

    39.                   //TODO 对于批量事件需要解析成两层列表

    40.                   for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {

    41.                       preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));

    42.                   }

    43.                   preparedStatement.executeUpdate();

    44.                   deliverySuccess = true;

    45.                   // 同步重试成功,移除事务日志

    46.                   transactionLogStorage.remove(event.getId());

    47.               } catch (final SQLException ex) {

    48.                   log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);

    49.               } finally {

    50.                   close(isNewConnection, conn, preparedStatement);

    51.               }

    52.           }

    53.           return;

    54.       default:

    55.           throw new UnsupportedOperationException(event.getEventExecutionType().toString());

    56.   }

    57. }

    • BestEffortsDeliveryListener 通过 EventBus 实现监听 SQL 的执行。Sharding-JDBC 如何实现 EventBus 的,请看《Sharding-JDBC 源码分析 —— SQL 执行》

    • 调用 #isProcessContinuously() 方法判断是否处于最大努力送达型事务中,当且仅当处于该状态才进行监听事件处理

    • SQL 执行,插入事务日志

    • SQL 执行成功,移除事务日志

    • SQL 执行失败,根据柔性事务配置( SoftTransactionConfiguration )同步的事务送达的最大尝试次数( syncMaxDeliveryTryTimes )进行多次重试直到成功。总体逻辑和 RdbTransactionLogStorage#processData() 方法逻辑类似,区别在于获取分片数据库连接的特殊处理:此处调用失败,数据库连接可能是异常无效的,因此调用了 #isValidConnection() 判断连接的有效性。若无效,则重新获取分片数据库连接。另外,若是重新获取分片数据库连接,需要进行关闭释放 ( Connection#close()):

    1. // BestEffortsDeliveryListener.java

    2. /**

    3. * 通过 SELECT 1 校验数据库连接是否有效

    4. *

    5. * @param conn 数据库连接

    6. * @return 是否有效

    7. */

    8. private boolean isValidConnection(final Connection conn) {

    9.   try (PreparedStatement preparedStatement = conn.prepareStatement("SELECT 1")) {

    10.       try (ResultSet rs = preparedStatement.executeQuery()) {

    11.           return rs.next() && 1 == rs.getInt("1");

    12.       }

    13.   } catch (final SQLException ex) {

    14.       return false;

    15.   }

    16. }

    17. /**

    18. * 关闭释放预编译SQL对象和数据库连接

    19. *

    20. * @param isNewConnection 是否新创建的数据库连接,是的情况下才释放

    21. * @param conn 数据库连接

    22. * @param preparedStatement 预编译SQL

    23. */

    24. private void close(final boolean isNewConnection, final Connection conn, final PreparedStatement preparedStatement) {

    25.   if (null != preparedStatement) {

    26.       try {

    27.           preparedStatement.close();

    28.       } catch (final SQLException ex) {

    29.           log.error("PreparedStatement closed error:", ex);

    30.       }

    31.   }

    32.   if (isNewConnection && null != conn) {

    33.       try {

    34.           conn.close();

    35.       } catch (final SQLException ex) {

    36.           log.error("Connection closed error:", ex);

    37.       }

    38.   }

    39. }

    6. 最大努力送达型异步作业

    当最大努力送达型事务监听器( BestEffortsDeliveryListener )多次同步重试失败后,交给最大努力送达型异步作业进行多次异步重试,并且多次执行有固定间隔

    Sharding-JDBC 提供了两个最大努力送达型异步作业实现:

    • NestedBestEffortsDeliveryJob :内嵌的最大努力送达型异步作业

    • BestEffortsDeliveryJob :最大努力送达型异步作业

    两者实现代码逻辑基本一致。前者相比后者,用于开发测试,去除对 Zookeeper 依赖,无法实现高可用,因此生产环境下不适合使用

    6.1 BestEffortsDeliveryJob

    BestEffortsDeliveryJob 所在 Maven 项目为 sharding-jdbc-transaction-async-job,基于当当开源的 Elastic-Job 实现。如下是官方对该 Maven 项目的简要说明:

    由于柔性事务采用异步尝试,需要部署独立的作业和Zookeeper。sharding-jdbc-transaction采用elastic-job实现的sharding-jdbc-transaction-async-job,通过简单配置即可启动高可用作业异步送达柔性事务,启动脚本为start.sh。

    BestEffortsDeliveryJob

    1. public class BestEffortsDeliveryJob extends AbstractIndividualThroughputDataFlowElasticJob<TransactionLog> {

    2.    /**

    3.     * 最大努力送达型异步作业配置对象

    4.     */

    5.    @Setter

    6.    private BestEffortsDeliveryConfiguration bedConfig;

    7.    /**

    8.     * 事务日志存储器对象

    9.     */

    10.    @Setter

    11.    private TransactionLogStorage transactionLogStorage;

    12.    @Override

    13.    public List<TransactionLog> fetchData(final JobExecutionMultipleShardingContext context) {

    14.        return transactionLogStorage.findEligibleTransactionLogs(context.getFetchDataCount(),

    15.            bedConfig.getJobConfig().getMaxDeliveryTryTimes(), bedConfig.getJobConfig().getMaxDeliveryTryDelayMillis());

    16.    }

    17.    @Override

    18.    public boolean processData(final JobExecutionMultipleShardingContext context, final TransactionLog data) {

    19.        try (

    20.            Connection conn = bedConfig.getTargetDataSource(data.getDataSource()).getConnection()) {

    21.            transactionLogStorage.processData(conn, data, bedConfig.getJobConfig().getMaxDeliveryTryTimes());

    22.        } catch (final SQLException | TransactionCompensationException ex) {

    23.            log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", data.getAsyncDeliveryTryTimes() + 1,

    24.                bedConfig.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage()));

    25.            return false;

    26.        }

    27.        return true;

    28.    }

    29.    @Override

    30.    public boolean isStreamingProcess() {

    31.        return false;

    32.    }

    33. }

    • 调用 #fetchData() 方法获取需要处理的事务日志 (TransactionLog),内部调用了 TransactionLogStorage#findEligibleTransactionLogs() 方法

    • 调用 #processData() 方法处理事务日志,重试执行失败的 SQL,内部调用了 TransactionLogStorage#processData()

    • #fetchData() 和 #processData() 调用是 Elastic-Job 控制的。每一轮定时调度,每条事务日志只执行一次。当超过最大异步调用次数后,该条事务日志不再处理,所以生产使用时,最好增加下相应监控超过最大异步重试次数的事务日志

    6.2 AsyncSoftTransactionJobConfiguration

    AsyncSoftTransactionJobConfiguration,异步柔性事务作业配置对象。

    1. public class AsyncSoftTransactionJobConfiguration {

    2.    /**

    3.     * 作业名称.

    4.     */

    5.    private String name = "bestEffortsDeliveryJob";

    6.    /**

    7.     * 触发作业的cron表达式.

    8.     */

    9.    private String cron = "0/5 * * * * ?";

    10.    /**

    11.     * 每次作业获取的事务日志最大数量.

    12.     */

    13.    private int transactionLogFetchDataCount = 100;

    14.    /**

    15.     * 事务送达的最大尝试次数.

    16.     */

    17.    private int maxDeliveryTryTimes = 3;

    18.    /**

    19.     * 执行事务的延迟毫秒数.

    20.     *

    21.     * <p>早于此间隔时间的入库事务才会被作业执行.</p>

    22.     */

    23.    private long maxDeliveryTryDelayMillis = 60  * 1000L;

    24. }

    6.3 Elastic-Job 是否必须?

    Sharding-JDBC 提供的最大努力送达型异步作业实现( BestEffortsDeliveryJob ),通过与 Elastic-Job 集成,可以很便捷并且有质量保证的高可用高性能使用。一部分团队,可能已经引入或自研了类似 Elastic-Job 的分布式作业中间件解决方案,每多一个中间件,就是多一个学习与运维成本。那么是否可以使用自己的分布式作业解决方案?答案是,可以的。参考 BestEffortsDeliveryJob 的实现,通过调用 TransactionLogStorage 来实现:

    1. // 伪代码(不考虑性能、异常)

    2. List<TransactionLog> transactionLogs = transactionLogStorage.findEligibleTransactionLogs(....);

    3. for (TransactionLog transactionLog : transactionLogs) {

    4.       transactionLogStorage.processData(conn, log, maxDeliveryTryTimes);

    5. }

    当然,个人还是很推荐 Elastic-Job。

    😈 笔者要开始写《Elastic-Job 源码分析》


    另外,如果有支持事务消息的分布式队列系统,可以通过 TransactionLogStorage 实现存储事务消息存储成消息。为什么要支持事务消息?如果 SQL 执行是成功的,需要回滚(删除)事务消息。

    7. 适用场景

    见《官方文档 - 事务支持》。

    8. 开发指南 & 开发示例

    见《官方文档 - 事务支持》。

    666. 彩蛋

    哈哈哈

    算是坚持把这个系列写完了,给自己 32 個赞。

    满足!

    《Elastic-Job 源码分析》 走起!不 High 不结束!


      您可能也对以下帖子感兴趣

      文章有问题?点此查看未经处理的缓存