查看原文
其他

分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业数据存储

老艿艿 芋道源码 2019-05-13

点击上方“芋道源码”,选择“置顶公众号”

技术文章第一时间送达!

源码精品专栏

 

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-storage/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. JobNodePath

  • 3. JobNodeStorage

  • 4. ConfigurationNode

  • 5. ServerNode

  • 6. InstanceNode

  • 7. ShardingNode

  • 8. LeaderNode

  • 9. FailoverNode

  • 10. GuaranteeNode

  • 666. 彩蛋


1. 概述

本文主要分享 Elastic-Job-Lite 作业数据存储

涉及到主要类的类图如下( 打开大图 ):

你行好事会因为得到赞赏而愉悦  
同理,开源项目贡献者会因为 Star 而更加有动力  
为 Elastic-Job 点赞!传送门

2. JobNodePath

JobNodePath,作业节点路径类。作业节点是在普通的节点前加上作业名称的前缀

在 Zookeeper 看一个作业的数据存储:

[zk: localhost:2181(CONNECTED) 65] ls /elastic-job-example-lite-java/javaSimpleJob
[leader, servers, config, instances, sharding]
  • elastic-job-example-lite-java:作业节点集群名,使用 ZookeeperConfiguration.namespace 属性配置。

  • javaSimpleJob:作业名字,使用 JobCoreConfiguration.jobName 属性配置。

  • config / servers / instances / sharding / leader:不同服务的数据存储节点路径。

JobNodePath,注释很易懂,点击链接查看。这里我们梳理下 JobNodePath 和其它节点路径类的关系:

Zookeeper 路径JobNodePath 静态属性JobNodePath 方法节点路径类
configCONFIG_NODE#getConfigNodePath()ConfigurationNode
serversSERVERS_NODE#getServerNodePath()ServerNode
instancesINSTANCES_NODE#getInstancesNodePath()InstanceNode
shardingSHARDING_NODE#getShardingNodePath()ShardingNode
leader/#getFullPath(node)LeaderNode
leader/failover/#getFullPath(node)FailoverNode
guarantee/#getFullPath(node)GuaranteeNode

3. JobNodeStorage

JobNodeStorage,作业节点数据访问类。

Elastic-Job-Lite 使用注册中心存储作业节点数据,JobNodeStorage 对注册中心提供的方法做下简单的封装提供调用。举个例子:

// JobNodeStorage.java
private final CoordinatorRegistryCenter regCenter;
private final JobNodePath jobNodePath;

/**
* 判断作业节点是否存在.

* @param node 作业节点名称
* @return 作业节点是否存在
*/

public boolean isJobNodeExisted(final String node) {
   return regCenter.isExisted(jobNodePath.getFullPath(node));
}

// JobNodePath.java
/**
* 获取节点全路径.

* @param node 节点名称
* @return 节点全路径
*/

public String getFullPath(final String node) {
   return String.format("/%s/%s", jobName, node);
}
  • 传递的参数 node 只是简单的作业节点名称,通过调用 JobNodePath#getFullPath(…) 方法获取节点全路径。

  • 其它方法类似,有兴趣的同学点击链接查看。

4. ConfigurationNode

ConfigurationNode,配置节点路径。

在 Zookeeper 看一个作业的配置节点数据存储:

[zk: localhost:2181(CONNECTED) 67] get /elastic-job-example-lite-java/javaSimpleJob/config
{"jobName":"javaSimpleJob","jobClass":"com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob","jobType":"SIMPLE","cron":"0/5 * * * * ?","shardingTotalCount":3,"shardingItemParameters":"0\u003dBeijing,1\u003dShanghai,2\u003dGuangzhou","jobParameter":"","failover":true,"misfire":true,"description":"","jobProperties":{"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution":true,"maxTimeDiffSeconds":-1,"monitorPort":-1,"jobShardingStrategyClass":"","reconcileIntervalMinutes":10,"disabled":false,"overwrite":true}
  • /config 持久节点,存储Lite作业配置( LiteJobConfiguration ) JSON化字符串。

ConfigurationNode 代码如下:

public final class ConfigurationNode {

    static final String ROOT = "config";
}

ConfigurationNode 如何读取、存储,在《Elastic-Job-Lite 源码分析 —— 作业配置》的「3.」作业配置服务已经详细解析。

5. ServerNode

ServerNode,服务器节点路径。

在 Zookeeper 看一个作业的服务器节点数据存储: 

[zk: localhost:2181(CONNECTED) 72] ls /elastic-job-example-lite-java/javaSimpleJob/servers
[192.168.16.164, 169.254.93.156, 192.168.252.57, 192.168.16.137, 192.168.3.2, 192.168.43.31]
[zk: localhost:2181(CONNECTED) 73] get /elastic-job-example-lite-java/javaSimpleJob/servers/192.168.16.164
  • /servers/ 目录下以 IP 为数据节点路径存储每个服务器节点。如果相同IP服务器有多个服务器节点,只存储一个 IP 数据节点。

  • /servers/${IP} 持久节点,不存储任何信息,只是空串( "");

ServerNode 代码如下:

public final class ServerNode {

    /**
     * 服务器信息根节点.
     */

    public static final String ROOT = "servers";

    private static final String SERVERS = ROOT + "/%s";
}

ServerNode 如何存储,在《Elastic-Job-Lite 源码分析 —— 作业初始化》的「3.2.4」注册作业启动信息已经详细解析。

6. InstanceNode

InstanceNode,运行实例节点路径。

在 Zookeeper 看一个作业的运行实例节点数据存储: 

[zk: localhost:2181(CONNECTED) 81] ls /elastic-job-example-lite-java/javaSimpleJob/instances
[192.168.16.137@-@56010]
[zk: localhost:2181(CONNECTED) 82] get /elastic-job-example-lite-java/javaSimpleJob/instances
  • /instances 目录下以作业实例主键( JOB_INSTANCE_ID ) 为数据节点路径存储每个运行实例节点。

  • /instances/${JOB_INSTANCE_ID} 临时节点,不存储任何信息,只是空串( "");

  • JOB_INSTANCE_ID 生成方式:

    // JobInstance.java

    jobInstanceId = IpUtils.getIp()
                    + DELIMITER
                    + ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; // PID

InstanceNode 代码如下:

public final class InstanceNode {

    /**
     * 运行实例信息根节点.
     */

    public static final String ROOT = "instances";

    private static final String INSTANCES = ROOT + "/%s";

    /**
     * 获取当前运行实例节点路径
     *
     * @return 当前运行实例节点路径
     */

    String getLocalInstanceNode() {
        return String.format(INSTANCES, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
    }
}

InstanceNode 如何存储,在《Elastic-Job-Lite 源码分析 —— 作业初始化》的「3.2.4」注册作业启动信息已经详细解析。

7. ShardingNode

ShardingNode,分片节点路径。

在 Zookeeper 看一个作业的分片节点数据存储: 

[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/sharding
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 2] ls /elastic-job-example-lite-java/javaSimpleJob/sharding/0
[running, instance, misfire]
[zk: localhost:2181(CONNECTED) 3] get /elastic-job-example-lite-java/javaSimpleJob/sharding/0/instance
192.168.16.137@-@56010
  • /sharding/${ITEM_ID} 目录下以作业分片项序号( ITEM_ID ) 为数据节点路径存储作业分片项的 instance / running / misfire / disable 数据节点信息。

  • /sharding/${ITEM_ID}/instance 临时节点,存储该作业分片项分配到的作业实例主键( JOB_INSTANCE_ID )。在《Elastic-Job-Lite 源码分析 —— 作业分片》详细解析。

  • /sharding/${ITEM_ID}/running 临时节点,当该作业分片项正在运行,存储空串( "" );当该作业分片项不在运行,移除该数据节点。《Elastic-Job-Lite 源码分析 —— 作业执行》的「4.6」执行普通触发的作业已经详细解析。

  • /sharding/${ITEM_ID}/misfire 永久节点,当该作业分片项被错过执行,存储空串( "" );当该作业分片项重新执行,移除该数据节点。《Elastic-Job-Lite 源码分析 —— 作业执行》的「4.7」执行被错过触发的作业已经详细解析。

  • /sharding/${ITEM_ID}/disable 永久节点,当该作业分片项被禁用,存储空串( "" );当该作业分片项被开启,移除数据节点。

ShardingNode,代码如下:

public final class ShardingNode {

    /**
     * 执行状态根节点.
     */

    public static final String ROOT = "sharding";

    static final String INSTANCE_APPENDIX = "instance";

    public static final String INSTANCE = ROOT + "/%s/" + INSTANCE_APPENDIX;

    static final String RUNNING_APPENDIX = "running";

    static final String RUNNING = ROOT + "/%s/" + RUNNING_APPENDIX;

    static final String MISFIRE = ROOT + "/%s/misfire";

    static final String DISABLED = ROOT + "/%s/disabled";

    static final String LEADER_ROOT = LeaderNode.ROOT + "/" + ROOT;

    static final String NECESSARY = LEADER_ROOT + "/necessary";

    static final String PROCESSING = LEADER_ROOT + "/processing";
}
  • LEADER_ROOT / NECESSARY / PROCESSING 放在「4.7」LeaderNode 解析。

8. LeaderNode

LeaderNode,主节点路径。

 leader 目录下一共有三个存储子节点:

  • election:主节点选举。

  • sharding:作业分片项分配。

  • failover:作业失效转移。

主节点选举

在 Zookeeper 看一个作业的 leader/election 节点数据存储: 

[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/leader/election
[latch, instance]
[zk: localhost:2181(CONNECTED) 2] get /elastic-job-example-lite-java/javaSimpleJob/leader/election/instance
192.168.16.137@-@1910
  • /leader/election/instance 临时节点,当作业集群完成选举后,存储主作业实例主键( JOB_INSTANCE_ID )。

  • /leader/election/latch 主节点选举分布式锁,是 Apache Curator 针对 Zookeeper 实现的分布式锁的一种,笔者暂未了解存储形式,无法解释。在《Elastic-Job-Lite 源码分析 —— 注册中心》的「3.1」在主节点执行操作进行了简单解析。

作业分片项分配

在 Zookeeper 看一个作业的 leader/sharding 节点数据存储: 

[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/leader/sharding
[necessary, processing]
[zk: localhost:2181(CONNECTED) 2] 个get /elastic-job-example-lite-java/javaSimpleJob/leader/sharding

[zk: localhost:2181(CONNECTED) 3] 个get /elastic-job-example-lite-java/javaSimpleJob/leader/processing
  • /leader/sharding/necessary 永久节点,当相同作业有新的作业节点加入或者移除时,存储空串( ""),标记需要进行作业分片项重新分配;当重新分配完成后,移除该数据节点。

  • /leader/sharding/processing 临时节点,当开始重新分配作业分片项时,存储空串( "" ),标记正在进行重新分配;当重新分配完成后,移除该数据节点。

  • 当且仅当作业节点为主节点时,才可以执行作业分片项分配,《Elastic-Job-Lite 源码分析 —— 作业分片》详细解析。

作业失效转移

作业失效转移数据节点在 FailoverNode,放在「9」FailoverNode 解析。

这里大家可能会和我一样比较疑惑,为什么 /leader/failover 放在 /leader 目录下,而不独立成为一个根目录?经过确认,作业失效转移 设计到分布式锁,统一存储在 /leader 目录下。


LeaderNode,代码如下:

public final class LeaderNode {

    /**
     * 主节点根路径.
     */

    public static final String ROOT = "leader";

    static final String ELECTION_ROOT = ROOT + "/election";

    static final String INSTANCE = ELECTION_ROOT + "/instance";

    static final String  LATCH = ELECTION_ROOT + "/latch";
}

9. FailoverNode

FailoverNode,失效转移节点路径。

在 Zookeeper 看一个作业的失效转移节点数据存储: 

[zk: localhost:2181(CONNECTED) 2] ls /elastic-job-example-lite-java/javaSimpleJob/leader/failover
[latch, items]
[zk: localhost:2181(CONNECTED) 4] ls /elastic-job-example-lite-java/javaSimpleJob/leader/failover/items
[0]
  • /leader/failover/latch 作业失效转移分布式锁,和 /leader/failover/latch 是一致的。

  • /leader/items/${ITEM_ID} 永久节点,当某台作业节点 CRASH 时,其分配的作业分片项标记需要进行失效转移,存储其分配的作业分片项的 /leader/items/${ITEM_ID} 为空串( "" );当失效转移标记,移除 /leader/items/${ITEM_ID},存储 /sharding/${ITEM_ID}/failover 为空串( "" ),临时节点,需要进行失效转移执行。《Elastic-Job-Lite 源码分析 —— 作业失效转移》详细解析。

FailoverNode 代码如下:

public final class FailoverNode {

    static final String FAILOVER = "failover";

    static final String LEADER_ROOT = LeaderNode.ROOT + "/" + FAILOVER;

    static final String ITEMS_ROOT = LEADER_ROOT + "/items";

    static final String ITEMS = ITEMS_ROOT + "/%s";

    static final String LATCH = LEADER_ROOT + "/latch";

    private static final String EXECUTION_FAILOVER = ShardingNode.ROOT + "/%s/" + FAILOVER;

    static String getItemsNode(final int item) {
        return String.format(ITEMS, item);
    }

    static String getExecutionFailoverNode(final int item) {
        return String.format(EXECUTION_FAILOVER, item);
    }
}

10. GuaranteeNode

GuaranteeNode,保证分布式任务全部开始和结束状态节点路径。在《Elastic-Job-Lite 源码分析 —— 作业监听器》详细解析。

666. 彩蛋

旁白君:芋道君,你又水更了!  
芋道君:屁屁屁,劳资怼死你!如下是作业数据存储整理,哼哼哈兮!

道友,赶紧上车,分享一波朋友圈!




如果你对 Dubbo / Netty 等等源码与原理感兴趣,欢迎加入我的知识星球一起交流。长按下方二维码噢


目前在知识星球更新了《Dubbo 源码解析》目录如下:

01. 调试环境搭建
02. 项目结构一览
03. 配置 Configuration
04. 核心流程一览

05. 拓展机制 SPI

06. 线程池

07. 服务暴露 Export

08. 服务引用 Refer

09. 注册中心 Registry

10. 动态编译 Compile

11. 动态代理 Proxy

12. 服务调用 Invoke

13. 调用特性 

14. 过滤器 Filter

15. NIO 服务器

16. P2P 服务器

17. HTTP 服务器

18. 序列化 Serialization

19. 集群容错 Cluster

20. 优雅停机

21. 日志适配

22. 状态检查

23. 监控中心 Monitor

24. 管理中心 Admin

25. 运维命令 QOS

26. 链路追踪 Tracing

... 一共 69+ 篇

目前在知识星球更新了《Netty 源码解析》目录如下:

01. 调试环境搭建
02. NIO 基础
03. Netty 简介
04. 启动 Bootstrap

05. 事件轮询 EventLoop

06. 通道管道 ChannelPipeline

07. 通道 Channel

08. 字节缓冲区 ByteBuf

09. 通道处理器 ChannelHandler

10. 编解码 Codec

11. 工具类 Util

... 一共 61+ 篇


目前在知识星球更新了《数据库实体设计》目录如下:


01. 商品模块
02. 交易模块
03. 营销模块
04. 公用模块

... 一共 17+ 篇

源码不易↓↓↓

点赞支持老艿艿↓↓

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存