查看原文
其他

数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 路由(二)之分库分表路由

2017-09-09 王文斌(芋艿) 芋道源码


本文主要基于 Sharding-JDBC 1.5.0 正式版

  • 1. 概述

  • 2. SQLRouteResult

  • 3. 路由策略 x 算法

  • 4. SQL 路由

  • 5. DatabaseHintSQLRouter

  • 6. ParsingSQLRouter

    • 6.1 SimpleRoutingEngine

    • 6.2 ComplexRoutingEngine

    • 6.3 CartesianRoutingEngine

    • 6.3 ParsingSQLRouter 主#route()

  • 666. 彩蛋


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表

  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址

  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢

  4. 新的源码解析文章实时收到通知。每周更新一篇左右

  5. 认真的源码交流微信群。


1. 概述

本文分享分表分库路由相关的实现。涉及内容如下:

  1. SQL 路由结果

  2. 路由策略 x 算法

  3. SQL 路由器

内容顺序如编号。

Sharding-JDBC 正在收集使用公司名单:传送门。
🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门
Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门
登记吧,骚年!传送门

SQL 路由大体流程如下:

2. SQLRouteResult

经过 SQL解析SQL路由后,产生SQL路由结果,即 SQLRouteResult。根据路由结果,生成SQL执行SQL

  • sqlStatement :SQL语句对象,经过SQL解析的结果对象。

  • executionUnits :SQL最小执行单元集合。SQL执行时,执行每个单元。

  • generatedKeys :插入SQL语句生成的主键编号集合。目前不支持批量插入而使用集合的原因,猜测是为了未来支持批量插入做准备。

3. 路由策略 x 算法

ShardingStrategy,分片策略。目前支持两种分片:

分片资源:在分库策略里指的是库,在分表策略里指的是表。

【1】 计算静态分片(常用)

  1. // ShardingStrategy.java

  2. /**

  3. * 计算静态分片.

  4. * @param sqlType SQL语句的类型

  5. * @param availableTargetNames 所有的可用分片资源集合

  6. * @param shardingValues 分片值集合

  7. * @return 分库后指向的数据源名称集合

  8. */

  9. public Collection<String> doStaticSharding(final SQLType sqlType, final Collection<String> availableTargetNames, final Collection<ShardingValue<?>> shardingValues) {

  10.   Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);

  11.   if (shardingValues.isEmpty()) {

  12.       Preconditions.checkState(!isInsertMultiple(sqlType, availableTargetNames), "INSERT statement should contain sharding value."); // 插入不能有多资源对象

  13.       result.addAll(availableTargetNames);

  14.   } else {

  15.       result.addAll(doSharding(shardingValues, availableTargetNames));

  16.   }

  17.   return result;

  18. }

  19. /**

  20. * 插入SQL 是否插入多个分片

  21. * @param sqlType SQL类型

  22. * @param availableTargetNames 所有的可用分片资源集合

  23. * @return 是否

  24. */

  25. private boolean isInsertMultiple(final SQLType sqlType, final Collection<String> availableTargetNames) {

  26.   return SQLType.INSERT == sqlType && availableTargetNames.size() > 1;

  27. }  

  • 插入SQL 需要有片键值,否则无法判断单个分片资源。(Sharding-JDBC 目前仅支持单条记录插入)

【2】计算动态分片

  1. // ShardingStrategy.java

  2. /**

  3. * 计算动态分片.

  4. * @param shardingValues 分片值集合

  5. * @return 分库后指向的分片资源集合

  6. */

  7. public Collection<String> doDynamicSharding(final Collection<ShardingValue<?>> shardingValues) {

  8.   Preconditions.checkState(!shardingValues.isEmpty(), "Dynamic table should contain sharding value."); // 动态分片必须有分片值

  9.   Collection<String> availableTargetNames = Collections.emptyList();

  10.   Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);

  11.   result.addAll(doSharding(shardingValues, availableTargetNames));

  12.   return result;

  13. }

  • 动态分片对应 TableRule.dynamic=true

  • 动态分片必须有分片值

😈 闷了,看起来两者没啥区别?答案在分片算法上。我们先看 #doSharding() 方法的实现。

  1. // ShardingStrategy.java

  2. /**

  3. * 计算分片

  4. * @param shardingValues 分片值集合

  5. * @param availableTargetNames 所有的可用分片资源集合

  6. * @return 分库后指向的分片资源集合

  7. */

  8. private Collection<String> doSharding(final Collection<ShardingValue<?>> shardingValues, final Collection<String> availableTargetNames) {

  9.   // 无片键

  10.   if (shardingAlgorithm instanceof NoneKeyShardingAlgorithm) {

  11.       return Collections.singletonList(((NoneKeyShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues.iterator().next()));

  12.   }

  13.   // 单片键

  14.   if (shardingAlgorithm instanceof SingleKeyShardingAlgorithm) {

  15.       SingleKeyShardingAlgorithm<?> singleKeyShardingAlgorithm = (SingleKeyShardingAlgorithm<?>) shardingAlgorithm;

  16.       ShardingValue shardingValue = shardingValues.iterator().next();

  17.       switch (shardingValue.getType()) {

  18.           case SINGLE:

  19.               return Collections.singletonList(singleKeyShardingAlgorithm.doEqualSharding(availableTargetNames, shardingValue));

  20.           case LIST:

  21.               return singleKeyShardingAlgorithm.doInSharding(availableTargetNames, shardingValue);

  22.           case RANGE:

  23.               return singleKeyShardingAlgorithm.doBetweenSharding(availableTargetNames, shardingValue);

  24.           default:

  25.               throw new UnsupportedOperationException(shardingValue.getType().getClass().getName());

  26.       }

  27.   }

  28.   // 多片键

  29.   if (shardingAlgorithm instanceof MultipleKeysShardingAlgorithm) {

  30.       return ((MultipleKeysShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues);

  31.   }

  32.   throw new UnsupportedOperationException(shardingAlgorithm.getClass().getName());

  33. }

  • 无分片键算法:对应 NoneKeyShardingAlgorithm 分片算法接口。

  1. public interface NoneKeyShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {

  2.    String doSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue);

  3. }

  • 单片键算法:对应 SingleKeyShardingAlgorithm 分片算法接口。

  1. public interface SingleKeyShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {

  2.    String doEqualSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue);

  3.    Collection<String> doInSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue);

  4.    Collection<String> doBetweenSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue);

  5. }

ShardingValueTypeSQL 操作符接口方法
SINGLE=#doEqualSharding()
LISTIN#doInSharding()
RANGEBETWEEN#doBetweenSharding()


    • 多片键算法:对应 MultipleKeysShardingAlgorithm 分片算法接口。






    1. public interface MultipleKeysShardingAlgorithm extends ShardingAlgorithm {

    2.    Collection<String> doSharding(Collection<String> availableTargetNames, Collection<ShardingValue<?>> shardingValues);

    3. }



    分片算法类结构如下:

    来看看 Sharding-JDBC 实现的无需分库的分片算法 NoneDatabaseShardingAlgorithm (NoneTableShardingAlgorithm 基本一模一样):

    1. public final class NoneDatabaseShardingAlgorithm implements SingleKeyDatabaseShardingAlgorithm<String>, MultipleKeysDatabaseShardingAlgorithm {

    2.    @Override

    3.    public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<ShardingValue<?>> shardingValues) {

    4.        return availableTargetNames;

    5.    }

    6.    @Override

    7.    public String doEqualSharding(final Collection<String> availableTargetNames, final ShardingValue<String> shardingValue) {

    8.        return availableTargetNames.isEmpty() ? null : availableTargetNames.iterator().next();

    9.    }

    10.    @Override

    11.    public Collection<String> doInSharding(final Collection<String> availableTargetNames, final ShardingValue<String> shardingValue) {

    12.        return availableTargetNames;

    13.    }

    14.    @Override

    15.    public Collection<String> doBetweenSharding(final Collection<String> availableTargetNames, final ShardingValue<String> shardingValue) {

    16.        return availableTargetNames;

    17.    }

    18. }

    • 一定要注意,NoneXXXXShardingAlgorithm 只适用于无分库/表的需求,否则会是错误的路由结果。例如, #doEqualSharding() 返回的是第一个分片资源。


    再来看测试目录下实现的余数基偶分表算法 ModuloTableShardingAlgorithm 的实现:

    1. // com.dangdang.ddframe.rdb.integrate.fixture.ModuloTableShardingAlgorithm.java

    2. public final class ModuloTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Integer> {

    3.    @Override

    4.    public String doEqualSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) {

    5.        for (String each : tableNames) {

    6.            if (each.endsWith(shardingValue.getValue() % 2 + "")) {

    7.                return each;

    8.            }

    9.        }

    10.        throw new UnsupportedOperationException();

    11.    }

    12.    @Override

    13.    public Collection<String> doInSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) {

    14.        Collection<String> result = new LinkedHashSet<>(tableNames.size());

    15.        for (Integer value : shardingValue.getValues()) {

    16.            for (String tableName : tableNames) {

    17.                if (tableName.endsWith(value % 2 + "")) {

    18.                    result.add(tableName);

    19.                }

    20.            }

    21.        }

    22.        return result;

    23.    }

    24.    @Override

    25.    public Collection<String> doBetweenSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) {

    26.        Collection<String> result = new LinkedHashSet<>(tableNames.size());

    27.        Range<Integer> range = shardingValue.getValueRange();

    28.        for (Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) {

    29.            for (String each : tableNames) {

    30.                if (each.endsWith(i % 2 + "")) {

    31.                    result.add(each);

    32.                }

    33.            }

    34.        }

    35.        return result;

    36.    }

    37. }

    • 我们可以参考这个例子编写自己的分片算哟 👼。

    • 多片键分库算法接口实现例子:MultipleKeysModuloDatabaseShardingAlgorithm.java


    😈 来看看动态计算分片需要怎么实现分片算法。

    1. // com.dangdang.ddframe.rdb.integrate.fixture.SingleKeyDynamicModuloTableShardingAlgorithm.java

    2. public final class SingleKeyDynamicModuloTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Integer> {

    3.    /**

    4.    * 表前缀

    5.    */

    6.    private final String tablePrefix;

    7.    @Override

    8.    public String doEqualSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) {

    9.        return tablePrefix + shardingValue.getValue() % 10;

    10.    }

    11.    @Override

    12.    public Collection<String> doInSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) {

    13.        Collection<String> result = new LinkedHashSet<>(shardingValue.getValues().size());

    14.        for (Integer value : shardingValue.getValues()) {

    15.            result.add(tablePrefix + value % 10);

    16.        }

    17.        return result;

    18.    }

    19.    @Override

    20.    public Collection<String> doBetweenSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) {

    21.        Collection<String> result = new LinkedHashSet<>(availableTargetNames.size());

    22.        Range<Integer> range = shardingValue.getValueRange();

    23.        for (Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) {

    24.            result.add(tablePrefix + i % 10);

    25.        }

    26.        return result;

    27.    }

    28. }

    • 骚年,是不是明白了一些?动态表无需把真实表配置到 TableRule,而是通过分片算法计算出真实表

    4. SQL 路由

    SQLRouter,SQL 路由器接口,共有两种实现:

    • DatabaseHintSQLRouter:通过提示且仅路由至数据库的SQL路由器

    • ParsingSQLRouter:需要解析的SQL路由器

    它们实现 #parse()进行SQL解析#route()进行SQL路由


    RoutingEngine,路由引擎接口,共有四种实现:

    • DatabaseHintRoutingEngine:基于数据库提示的路由引擎

    • SimpleRoutingEngine:简单路由引擎

    • CartesianRoutingEngine:笛卡尔积的库表路由

    • ComplexRoutingEngine:混合多库表路由引擎

    ComplexRoutingEngine 根据路由结果会转化成 SimpleRoutingEngine 或 ComplexRoutingEngine。下文会看相应源码。


    路由结果有两种:

    • RoutingResult:简单路由结果

    • CartesianRoutingResult:笛卡尔积路由结果

    从图中,我们已经能大概看到两者有什么区别,更具体的下文随源码一起分享。

    😈 SQLRouteResult 和 RoutingResult 有什么区别?

    • SQLRouteResult:整个SQL路由返回的路由结果

    • RoutingResult:RoutingEngine返回路由结果


    一下子看到这么多"对象",可能有点紧张。不要紧张,我们一起在整理下。

    路由器路由引擎路由结果
    DatabaseHintSQLRouterDatabaseHintRoutingEngineRoutingResult
    ParsingSQLRouterSimpleRoutingEngineRoutingResult
    ParsingSQLRouterCartesianRoutingEngineCartesianRoutingResult

    😈 逗比博主给大家解决了"对象",是不是应该分享朋友圈

    5. DatabaseHintSQLRouter

    DatabaseHintSQLRouter,基于数据库提示的路由引擎。路由器工厂 SQLRouterFactory 创建路由器时,判断到使用数据库提示( Hint ) 时,创建 DatabaseHintSQLRouter。

    1. // DatabaseHintRoutingEngine.java

    2. public static SQLRouter createSQLRouter(final ShardingContext shardingContext) {

    3.   return HintManagerHolder.isDatabaseShardingOnly() ? new DatabaseHintSQLRouter(shardingContext) : new ParsingSQLRouter(shardingContext);

    4. }

    先来看下 HintManagerHolder、HintManager 部分相关的代码:

    1. // HintManagerHolder.java

    2. public final class HintManagerHolder {

    3.    /**

    4.     * HintManager 线程变量

    5.     */

    6.    private static final ThreadLocal<HintManager> HINT_MANAGER_HOLDER = new ThreadLocal<>();

    7.    /**

    8.     * 判断是否当前只分库.

    9.     *

    10.     * @return 是否当前只分库.

    11.     */

    12.    public static boolean isDatabaseShardingOnly() {

    13.        return null != HINT_MANAGER_HOLDER.get() && HINT_MANAGER_HOLDER.get().isDatabaseShardingOnly();

    14.    }

    15.    /**

    16.     * 清理线索分片管理器的本地线程持有者.

    17.     */

    18.    public static void clear() {

    19.        HINT_MANAGER_HOLDER.remove();

    20.    }

    21. }

    22. // HintManager.java

    23. public final class HintManager implements AutoCloseable {

    24.    /**

    25.     * 库分片值集合

    26.     */

    27.    private final Map<ShardingKey, ShardingValue<?>> databaseShardingValues = new HashMap<>();

    28.    /**

    29.     * 只做库分片

    30.     * {@link DatabaseHintRoutingEngine}

    31.     */

    32.    @Getter

    33.    private boolean databaseShardingOnly;

    34.    /**

    35.     * 获取线索分片管理器实例.

    36.     *

    37.     * @return 线索分片管理器实例

    38.     */

    39.    public static HintManager getInstance() {

    40.        HintManager result = new HintManager();

    41.        HintManagerHolder.setHintManager(result);

    42.        return result;

    43.    }

    44.    /**

    45.     * 设置分库分片值.

    46.     *

    47.     * <p>分片操作符为等号.该方法适用于只分库的场景</p>

    48.     *

    49.     * @param value 分片值

    50.     */

    51.    public void setDatabaseShardingValue(final Comparable<?> value) {

    52.        databaseShardingOnly = true;

    53.        addDatabaseShardingValue(HintManagerHolder.DB_TABLE_NAME, HintManagerHolder.DB_COLUMN_NAME, value);

    54.    }

    55. }

    那么如果要使用 DatabaseHintSQLRouter,我们只需要 HintManager.getInstance().setDatabaseShardingValue(库分片值) 即可。这里有两点要注意下:

    • HintManager#getInstance(),每次获取到的都是的 HintManager,多次赋值需要小心。

    • HintManager#close(),使用完需要去清理,避免下个请求读到遗漏的线程变量。


    看看 DatabaseHintSQLRouter 的实现:

    1. // DatabaseHintSQLRouter.java

    2. @Override

    3. public SQLStatement parse(final String logicSQL, final int parametersSize) {

    4.   return new SQLJudgeEngine(logicSQL).judge(); // 只解析 SQL 类型

    5. }  

    6. @Override

    7. // TODO insert的SQL仍然需要解析自增主键

    8. public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {

    9.   Context context = MetricsContext.start("Route SQL");

    10.   SQLRouteResult result = new SQLRouteResult(sqlStatement);

    11.   // 路由

    12.   RoutingResult routingResult = new DatabaseHintRoutingEngine(shardingRule.getDataSourceRule(), shardingRule.getDatabaseShardingStrategy(), sqlStatement.getType())

    13.           .route();

    14.   // SQL最小执行单元

    15.   for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {

    16.       result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), logicSQL));

    17.   }

    18.   MetricsContext.stop(context);

    19.   if (showSQL) {

    20.       SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits(), parameters);

    21.   }

    22.   return result;

    23. }

    • #parse() 只解析了 SQL 类型,即 SELECT / UPDATE / DELETE / INSERT 。

    • 使用的分库策略来自 ShardingRule,不是 TableRule,这个一定要留心。❓因为 SQL 未解析表名。因此,即使在 TableRule 设置了 actualTables 属性也是没有效果的。

    • 目前不支持 Sharding-JDBC 的主键自增。❓因为 SQL 未解析自增主键。从代码上的 TODO应该会支持。

    • HintManager.getInstance().setDatabaseShardingValue(库分片值) 设置的库分片值使用的是 EQUALS,因而分库策略计算出来的只有一个库分片,即 TableUnit 只有一个,SQLExecutionUnit 只有一个。


    看看 DatabaseHintSQLRouter 的实现:

    1. // DatabaseHintRoutingEngine.java

    2. @Override

    3. public RoutingResult route() {

    4.   // 从 Hint 获得 分片键值

    5.   Optional<ShardingValue<?>> shardingValue = HintManagerHolder.getDatabaseShardingValue(new ShardingKey(HintManagerHolder.DB_TABLE_NAME, HintManagerHolder.DB_COLUMN_NAME));

    6.   Preconditions.checkState(shardingValue.isPresent());

    7.   log.debug("Before database sharding only db:{} sharding values: {}", dataSourceRule.getDataSourceNames(), shardingValue.get());

    8.   // 路由。表分片规则使用的是 ShardingRule 里的。因为没 SQL 解析。

    9.   Collection<String> routingDataSources = databaseShardingStrategy.doStaticSharding(sqlType, dataSourceRule.getDataSourceNames(), Collections.<ShardingValue<?>>singleton(shardingValue.get()));

    10.   Preconditions.checkState(!routingDataSources.isEmpty(), "no database route info");

    11.   log.debug("After database sharding only result: {}", routingDataSources);

    12.   // 路由结果

    13.   RoutingResult result = new RoutingResult();

    14.   for (String each : routingDataSources) {

    15.       result.getTableUnits().getTableUnits().add(new TableUnit(each, "", ""));

    16.   }

    17.   return result;

    18. }

    • 调用 databaseShardingStrategy.doStaticSharding() 方法计算分片。

    • newTableUnit(each,"","") 的 logicTableName, actualTableName 都是空串,相信原因你已经知道。

    6. ParsingSQLRouter

    ParsingSQLRouter,需要解析的SQL路由器。

    ParsingSQLRouter 使用 SQLParsingEngine 解析SQL。对SQL解析有兴趣的同学可以看看拙作《Sharding-JDBC 源码分析 —— SQL 解析》。

    1. // ParsingSQLRouter.java

    2. public SQLStatement parse(final String logicSQL, final int parametersSize) {

    3.   SQLParsingEngine parsingEngine = new SQLParsingEngine(databaseType, logicSQL, shardingRule);

    4.   Context context = MetricsContext.start("Parse SQL");

    5.   SQLStatement result = parsingEngine.parse();

    6.   if (result instanceof InsertStatement) {

    7.       ((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize);

    8.   }

    9.   MetricsContext.stop(context);

    10.   return result;

    11. }

    • #appendGenerateKeyToken() 会在《SQL 改写》分享


    ParsingSQLRouter 在路由时,会根据表情况使用 SimpleRoutingEngine 或 CartesianRoutingEngine 进行路由。

    1. private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) {

    2.   Collection<String> tableNames = sqlStatement.getTables().getTableNames();

    3.   RoutingEngine routingEngine;

    4.   if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames)) {

    5.       routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);

    6.   } else {

    7.       // TODO 可配置是否执行笛卡尔积

    8.       routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);

    9.   }

    10.   return routingEngine.route();

    11. }

    • 当只进行一张表或者多表互为BindingTable关系时,使用 SimpleRoutingEngine 简单路由引擎。多表互为BindingTable关系时,每张表的路由结果是相同的,所以只要计算第一张表的分片即可。

    • tableNames.iterator().next() 注意下, tableNames 变量是 newTreeMap<>(String.CASE_INSENSITIVE_ORDER)。所以 SELECT*FROM t_order o join t_order_item i ON o.order_id=i.order_id 即使 t_order_item 排在 t_order 前面, tableNames.iterator().next() 返回的是 t_order。当 t_order 和 t_order_item 为 BindingTable关系 时,计算的是 t_order 路由分片。

    • BindingTable关系在 ShardingRule 的 tableRules 配置。配置该关系 TableRule 有如下需要遵守的规则:

      • 分片策略与算法相同

      • 数据源配置对象相同

      • 真实表数量相同

    举个例子

    • SQL : SELECT*FROM t_order o join t_order_item i ON o.order_id=i.order_id

    • 分库分表情况:

    1. multi_db_multi_table_01

    2.  ├── t_order_0                        ├── t_order_item_01

    3.  └── t_order_1                        ├── t_order_item_02

    4.                                       ├── t_order_item_03

    5.                                       ├── t_order_item_04

    6. multi_db_multi_table_02

    7.  ├── t_order_0                        ├── t_order_item_01

    8.  └── t_order_1                        ├── t_order_item_02

    9.                                       ├── t_order_item_03

    10.                                       ├── t_order_item_04

    最终执行的SQL如下:

    1. SELECT * FROM t_order_item_01 i JOIN t_order_01 o ON o.order_id = i.order_id

    2. SELECT * FROM t_order_item_01 i JOIN t_order_01 o ON o.order_id = i.order_id

    3. SELECT * FROM t_order_item_02 i JOIN t_order_02 o ON o.order_id = i.order_id

    4. SELECT * FROM t_order_item_02 i JOIN t_order_02 o ON o.order_id = i.order_id

    • t_order_item_03、 t_order_item_04 无法被查询到。

    下面我们看看 #isAllBindingTables() 如何实现多表互为BindingTable关系

    1. // ShardingRule.java

    2. // 调用顺序 #isAllBindingTables()=>#filterAllBindingTables()=>#findBindingTableRule()=>#findBindingTableRule()

    3. /**

    4. * 判断逻辑表名称集合是否全部属于Binding表.

    5. * @param logicTables 逻辑表名称集合

    6. */

    7. public boolean isAllBindingTables(final Collection<String> logicTables) {

    8.   Collection<String> bindingTables = filterAllBindingTables(logicTables);

    9.   return !bindingTables.isEmpty() && bindingTables.containsAll(logicTables);

    10. }

    11. /**

    12. * 过滤出所有的Binding表名称.

    13. */

    14. public Collection<String> filterAllBindingTables(final Collection<String> logicTables) {

    15.   if (logicTables.isEmpty()) {

    16.       return Collections.emptyList();

    17.   }

    18.   Optional<BindingTableRule> bindingTableRule = findBindingTableRule(logicTables);

    19.   if (!bindingTableRule.isPresent()) {

    20.       return Collections.emptyList();

    21.   }

    22.   // 交集

    23.   Collection<String> result = new ArrayList<>(bindingTableRule.get().getAllLogicTables());

    24.   result.retainAll(logicTables);

    25.   return result;

    26. }

    27. /**

    28. * 获得包含<strong>任一</strong>在逻辑表名称集合的binding表配置的逻辑表名称集合

    29. */

    30. private Optional<BindingTableRule> findBindingTableRule(final Collection<String> logicTables) {

    31.   for (String each : logicTables) {

    32.       Optional<BindingTableRule> result = findBindingTableRule(each);

    33.       if (result.isPresent()) {

    34.           return result;

    35.       }

    36.   }

    37.   return Optional.absent();

    38. }

    39. /**

    40. * 根据逻辑表名称获取binding表配置的逻辑表名称集合.

    41. */

    42. public Optional<BindingTableRule> findBindingTableRule(final String logicTable) {

    43.   for (BindingTableRule each : bindingTableRules) {

    44.       if (each.hasLogicTable(logicTable)) {

    45.           return Optional.of(each);

    46.       }

    47.   }

    48.   return Optional.absent();

    49. }

    • 逻辑看起来比较长,目的是找到一条 BindingTableRule 包含所有逻辑表集合

    • 不支持《传递关系》:配置 BindingTableRule 时,相同绑定关系一定要配置在一条,必须是 [a,b,c],而不能是 [a,b],[b,c]

    6.1 SimpleRoutingEngine

    SimpleRoutingEngine,简单路由引擎。

    1. // SimpleRoutingEngine.java

    2. // ... 超过微信30000字限制,省略代码。请点击原文阅读。

    • 可以使用 HintManager 设置分片值进行强制路由

    • #getShardingValues() 我们看到了《SQL 解析(二)之SQL解析》分享的 Condition 对象。之前我们提到过Parser 半理解SQL的目的之一是:提炼分片上下文,此处即是该目的的体现。Condition 里只放明确影响路由的条件,例如: order_id=1order_id IN(1,2)order_id BETWEEN(1,3),不放无法计算的条件,例如: o.order_id=i.order_id。该方法里,使用分片键从 Condition 查找 分片值。🙂 是不是对 Condition 的认识更加清晰一丢丢落。

    1. // SimpleRoutingEngine.java

    2. // ... 超过微信30000字限制,省略代码。请点击原文阅读。

    • 可以使用 HintManager 设置分片值进行强制路由

    • 根据 dynamic 属性来判断调用 #doDynamicSharding() 还是 #doStaticSharding() 计算分片。

    1. // SimpleRoutingEngine.java

    2. // ... 超过微信30000字限制,省略代码。请点击原文阅读。

    • 在 SimpleRoutingEngine 只生成了当前表的 TableUnits。如果存在与其互为BindingTable关系的表的 TableUnits 怎么获得?你可以想想噢,当然在后文《SQL 改写》也会给出答案,看看和你想的是否一样。

    6.2 ComplexRoutingEngine

    ComplexRoutingEngine,混合多库表路由引擎。

    1. // ComplexRoutingEngine.java

    2. // ... 超过微信30000字限制,省略代码。请点击原文阅读。

    • ComplexRoutingEngine 计算每个逻辑表的简单路由分片,路由结果交给 CartesianRoutingEngine 继续路由形成笛卡尔积结果。

    • 由于目前 ComplexRoutingEngine 路由前已经判断全部表互为 BindingTable 关系,因而不会出现 result.size==1,属于防御性编程。

    • 部分表互为 BindingTable 关系时,ComplexRoutingEngine 不重复计算分片。

    6.3 CartesianRoutingEngine

    CartesianRoutingEngine,笛卡尔积的库表路由。

    实现逻辑上相对复杂,请保持耐心哟,😈 其实目的就是实现连连看的效果:

    • RoutingResult[0] x RoutingResult[1] …… x RoutingResult[n- 1] x RoutingResult[n]

    • 同库 才可以进行笛卡尔积

    1. // CartesianRoutingEngine.java

    2. // ... 超过微信30000字限制,省略代码。请点击原文阅读。

    • 第一步,获得同库对应的逻辑表集合,即 Entry<数据源(库), Set<逻辑表>> entry

    • 第二步,遍历数据源(库),获得当前数据源(库)路由表单元分组

    • 第三步,对路由表单元分组进行笛卡尔积,并合并到路由结果。

    下面,我们一起逐步看看代码实现。

    • SQL : SELECT*FROM t_order o join t_order_item i ON o.order_id=i.order_id

    • 分库分表情况:

    1. multi_db_multi_table_01

    2.  ├── t_order_0                        ├── t_order_item_01

    3.  └── t_order_1                        ├── t_order_item_02

    4. multi_db_multi_table_02

    5.  ├── t_order_0                        ├── t_order_item_01

    6.  └── t_order_1                        ├── t_order_item_02

    1. // 第一步

    2. // CartesianRoutingEngine.java

    3. /**

    4. * 获得同库对应的逻辑表集合

    5. */

    6. // ... 超过微信30000字限制,省略代码。请点击原文阅读。

    • #getDataSourceLogicTablesMap() 返回如图:


    1. // 第二步

    2. // CartesianRoutingEngine.java

    3. private List<Set<String>> getActualTableGroups(final String dataSource, final Set<String> logicTables) {

    4.   List<Set<String>> result = new ArrayList<>(logicTables.size());

    5.   for (RoutingResult each : routingResults) {

    6.       result.addAll(each.getTableUnits().getActualTableNameGroups(dataSource, logicTables));

    7.   }

    8.   return result;

    9. }

    10. private List<Set<TableUnit>> toTableUnitGroups(final String dataSource, final List<Set<String>> actualTableGroups) {

    11.   List<Set<TableUnit>> result = new ArrayList<>(actualTableGroups.size());

    12.   for (Set<String> each : actualTableGroups) {

    13.       result.add(new HashSet<>(Lists.transform(new ArrayList<>(each), new Function<String, TableUnit>() {

    14.           @Override

    15.           public TableUnit apply(final String input) {

    16.               return findTableUnit(dataSource, input);

    17.           }

    18.       })));

    19.   }

    20.   return result;

    21. }

    • #getActualTableGroups() 返回如图:

    • #toTableUnitGroups() 返回如图:


    1. // CartesianRoutingEngine.java

    2. private List<CartesianTableReference> getCartesianTableReferences(final Set<List<TableUnit>> cartesianTableUnitGroups) {

    3.   List<CartesianTableReference> result = new ArrayList<>(cartesianTableUnitGroups.size());

    4.   for (List<TableUnit> each : cartesianTableUnitGroups) {

    5.       result.add(new CartesianTableReference(each));

    6.   }

    7.   return result;

    8. }

    9. // CartesianRoutingResult.java

    10. @Getter

    11. private final List<CartesianDataSource> routingDataSources = new ArrayList<>();

    12. void merge(final String dataSource, final Collection<CartesianTableReference> routingTableReferences) {

    13.   for (CartesianTableReference each : routingTableReferences) {

    14.       merge(dataSource, each);

    15.   }

    16. }

    17. private void merge(final String dataSource, final CartesianTableReference routingTableReference) {

    18.   for (CartesianDataSource each : routingDataSources) {

    19.       if (each.getDataSource().equalsIgnoreCase(dataSource)) {

    20.           each.getRoutingTableReferences().add(routingTableReference);

    21.           return;

    22.       }

    23.   }

    24.   routingDataSources.add(new CartesianDataSource(dataSource, routingTableReference));

    25. }

    • Sets.cartesianProduct(tableUnitGroups) 返回如图(Guava 工具库真强大):


    • #getCartesianTableReferences() 返回如图:

      CartesianTableReference,笛卡尔积表路由组,包含多条 TableUnit,即 TableUnit[0] x TableUnit[1] …… x TableUnit[n]。例如图中: t_order_01 x t_order_item_02,最终转换成 SQL 为 SELECT*FROM t_order_01 o join t_order_item_02 i ON o.order_id=i.order_id


    • #merge() 合并笛卡尔积路由结果。CartesianRoutingResult 包含多个 CartesianDataSource,因此需要将 CartesianTableReference 合并(添加)到对应的 CartesianDataSource。当然,目前在实现时已经是按照数据源(库)生成对应的 CartesianTableReference。


    6.4 ParsingSQLRouter 主#route()

    1. // ParsingSQLRouter.java

    2. // ... 超过微信30000字限制,省略代码。请点击原文阅读。

    • RoutingResultroutingResult=route(parameters,sqlStatement); 调用的就是上文分析的 SimpleRoutingEngine、ComplexRoutingEngine、CartesianRoutingEngine 的 #route() 方法。

    • #processGeneratedKey()、 #processLimit()、 #rewrite()、 #generateSQL() 等会放在《SQL 改写》 分享。

    666. 彩蛋

    篇幅有些长,希望能让大家对路由有比较完整的认识。
    如果内容有错误,烦请您指正,我会认真修改。
    如果表述不清晰,不太理解的,欢迎加我微信(wangwenbin-server)一起探讨。

    谢谢你技术这么好,还耐心看完了本文。

    强制路由 HintManager 讲的相对略过,可以看如下内容进一步了解:

    1. 《官方文档-强制路由》

    2. HintManager.java 源码

    厚着脸皮,道友,辛苦分享朋友圈可好?!


      您可能也对以下帖子感兴趣

      文章有问题?点此查看未经处理的缓存