查看原文
其他

分布式作业 Elastic-Job-Lite 源码分析 —— 注册中心监听器

老艿艿 芋道源码 2019-05-13

点击上方“芋道源码”,选择“置顶公众号”

技术文章第一时间送达!

源码精品专栏

 

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/reg-center-zookeeper-listener/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. ListenerManager

  • 3. AbstractListenerManager

  • 4. AbstractJobListener

  • 5. RegistryCenterConnectionStateListener

  • 666. 彩蛋


1. 概述

本文主要分享 Elastic-Job-Lite 注册中心监听器

建议前置阅读:

  • 《Elastic-Job-Lite 源码分析 —— 注册中心》

涉及到主要类的类图如下( 打开大图 ):

你行好事会因为得到赞赏而愉悦  
同理,开源项目贡献者会因为 Star 而更加有动力  
为 Elastic-Job 点赞!传送门

2. ListenerManager

ListenerManager,作业注册中心的监听器管理者。管理者两类组件:

  • 监听管理器

  • 注册中心连接状态监听器

其中监听管理器管理着自己的作业注册中心监听器。

一起从代码层面看看:

public final class ListenerManager {

    private final JobNodeStorage jobNodeStorage;

    private final ElectionListenerManager electionListenerManager;

    private final ShardingListenerManager shardingListenerManager;

    private final FailoverListenerManager failoverListenerManager;

    private final MonitorExecutionListenerManager monitorExecutionListenerManager;

    private final ShutdownListenerManager shutdownListenerManager;

    private final TriggerListenerManager triggerListenerManager;

    private final RescheduleListenerManager rescheduleListenerManager;

    private final GuaranteeListenerManager guaranteeListenerManager;

    private final RegistryCenterConnectionStateListener regCenterConnectionStateListener;
}
  • 第一类:electionListenerManager / shardingListenerManager / failoverListenerManager / MonitorExecutionListenerManager / shutdownListenerManager / triggerListenerManager / rescheduleListenerManager / guaranteeListenerManager 是不同服务的监听管理器,都继承作业注册中心的监听器管理者的抽象类( AbstractListenerManager )。我们以下一篇文章会涉及到的分片监听管理器( ShardingListenerManager ) 来瞅瞅内部整体实现:

    public final class ShardingListenerManager extends AbstractListenerManager {
        @Override
        public void start() {
            addDataListener(new ShardingTotalCountChangedJobListener());
            addDataListener(new ListenServersChangedJobListener());
        }class ShardingTotalCountChangedJobListener extends AbstractJobListener {
        // .... 省略方法
    }

    class ListenServersChangedJobListener extends AbstractJobListener {
        // .... 省略方法
    }
    }
    • ShardingListenerManager 内部管理了 ShardingTotalCountChangedJobListener / ListenServersChangedJobListener 两个作业注册中心监听器。具体作业注册中心监听器是什么,有什么用途,下文会详细解析。

  • 第二类:regCenterConnectionStateListener 是注册中心连接状态监听器。下文也会详细解析。

在《Elastic-Job-Lite 源码分析 —— 作业初始化》「3.2.4」注册作业启动信息,我们看到作业初始化时,会开启所有注册中心监听器:

// SchedulerFacade.java
/**
* 注册作业启动信息.

* @param enabled 作业是否启用
*/

public void registerStartUpInfo(final boolean enabled) {
   // 开启 所有监听器
   listenerManager.startAllListeners();
   // .... 省略方法
}

// ListenerManager.java
/**
* 开启所有监听器.
*/

public void startAllListeners() {
   // 开启 不同服务监听管理器
   electionListenerManager.start();
   shardingListenerManager.start();
   failoverListenerManager.start();
   monitorExecutionListenerManager.start();
   shutdownListenerManager.start();
   triggerListenerManager.start();
   rescheduleListenerManager.start();
   guaranteeListenerManager.start();
   // 开启 注册中心连接状态监听器
   jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}

3. AbstractListenerManager

AbstractListenerManager,作业注册中心的监听器管理者的抽象类

public abstract class AbstractListenerManager {

    private final JobNodeStorage jobNodeStorage;

    protected AbstractListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName) {
        jobNodeStorage = new JobNodeStorage(regCenter, jobName);
    }

    /**
     * 开启监听器.
     */

    public abstract void start();

    /**
     * 添加注册中心监听器
     *
     * @param listener 注册中心监听器
     */

    protected void addDataListener(final TreeCacheListener listener) {
        jobNodeStorage.addDataListener(listener);
    }
}
  • #addDataListener(),将作业注册中心的监听器添加到注册中心 TreeCache 的监听者里。JobNodeStorage#addDataListener(…) 在《Elastic-Job-Lite 源码分析 —— 作业初始化》「2.2」缓存已经详细解析。

  • 子类实现 #start() 方法实现监听器初始化。目前所有子类的实现都是将自己管理的注册中心监听器调用 #addDataListener(...),还是以 ShardingListenerManager 举例子:

    public final class ShardingListenerManager extends AbstractListenerManager {@Override
    public void start() {
        addDataListener(new ShardingTotalCountChangedJobListener());
        addDataListener(new ListenServersChangedJobListener());
    }
    }

4. AbstractJobListener

AbstractJobListener,作业注册中心的监听器抽象类

public abstract class AbstractJobListener implements TreeCacheListener {

    @Override
    public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
        ChildData childData = event.getData();
        // 忽略掉非数据变化的事件,例如 event.type 为 CONNECTION_SUSPENDED、CONNECTION_RECONNECTED、CONNECTION_LOST、INITIALIZED 事件
        if (null == childData) {
            return;
        }
        String path = childData.getPath();
        if (path.isEmpty()) {
            return;
        }
        dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
    }

    /**
     * 节点数据变化
     *
     * @param path 节点路径
     * @param eventType 事件类型
     * @param data 数据
     */

    protected abstract void dataChanged(final String path, final Type eventType, final String data);
}
  • 作业注册中心的监听器实现类实现 #dataChanged(…),对节点数据变化进行处理。

  • #childEvent(…) 屏蔽掉非节点数据变化事件,例如:CONNECTION_SUSPENDED、CONNECTION_RECONNECTED、CONNECTION_LOST、INITIALIZED 事件,只处理 NODE_ADDED、NODE_UPDATED、NODE_REMOVED 事件。

我们再拿 ShardingListenerManager 举例子:

public final class ShardingListenerManager extends AbstractListenerManager {

    class ShardingTotalCountChangedJobListener extends AbstractJobListener {

        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
                if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                    shardingService.setReshardingFlag();
                    JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
                }
            }
        }
    }

    class ListenServersChangedJobListener extends AbstractJobListener {

        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
                shardingService.setReshardingFlag();
            }
        }

        private boolean isInstanceChange(final Type eventType, final String path) {
            return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
        }

        private boolean isServerChange(final String path) {
            return serverNode.isServerPath(path);
        }
    }

}
  • 在《Elastic-Job-Lite 源码解析 —— 任务分片》详细解析。

5. RegistryCenterConnectionStateListener

RegistryCenterConnectionStateListener,实现 Curator ConnectionStateListener 接口,注册中心连接状态监听器。

public final class RegistryCenterConnectionStateListener implements ConnectionStateListener {

    @Override
    public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
        if (JobRegistry.getInstance().isShutdown(jobName)) {
            return;
        }
        JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
        if (ConnectionState.SUSPENDED == newState || ConnectionState.LOST == newState) { // Zookeeper 连接终端 或 连接丢失
            // 暂停作业调度
            jobScheduleController.pauseJob();
        } else if (ConnectionState.RECONNECTED == newState) { // Zookeeper 重新连上
            // 持久化作业服务器上线信息
            serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()));
            // 持久化作业运行实例上线相关信息
            instanceService.persistOnline();
            // 清除本地分配的作业分片项运行中的标记
            executionService.clearRunningInfo(shardingService.getLocalShardingItems());
            // 恢复作业调度
            jobScheduleController.resumeJob();
        }
    }

}
  • 当注册中心连接 SUSPENDED 或 LOST 时,暂停本地作业调度:

    // JobScheduleController.java
    public synchronized void pauseJob() {
       try {
           if (!scheduler.isShutdown()) {
               scheduler.pauseAll();
           }
       } catch (final SchedulerException ex) {
           throw new JobSystemException(ex);
       }
    }
  • 当注册中心重新连接成功( RECONNECTED ),恢复本地作业调度:

    /**
    * 恢复作业.
    */

    public synchronized void resumeJob() {
      try {
          if (!scheduler.isShutdown()) {
              scheduler.resumeAll();
          }
      } catch (final SchedulerException ex) {
          throw new JobSystemException(ex);
      }
    }

666. 彩蛋

旁白君:芋道君,你又水更了!  
芋道君:是是是,是是是!

道友,赶紧上车,分享一波朋友圈!



如果你对 Dubbo / Netty 等等源码与原理感兴趣,欢迎加入我的知识星球一起交流。长按下方二维码噢


目前在知识星球更新了《Dubbo 源码解析》目录如下:

01. 调试环境搭建
02. 项目结构一览
03. 配置 Configuration
04. 核心流程一览

05. 拓展机制 SPI

06. 线程池

07. 服务暴露 Export

08. 服务引用 Refer

09. 注册中心 Registry

10. 动态编译 Compile

11. 动态代理 Proxy

12. 服务调用 Invoke

13. 调用特性 

14. 过滤器 Filter

15. NIO 服务器

16. P2P 服务器

17. HTTP 服务器

18. 序列化 Serialization

19. 集群容错 Cluster

20. 优雅停机

21. 日志适配

22. 状态检查

23. 监控中心 Monitor

24. 管理中心 Admin

25. 运维命令 QOS

26. 链路追踪 Tracing

... 一共 69+ 篇

目前在知识星球更新了《Netty 源码解析》目录如下:

01. 调试环境搭建
02. NIO 基础
03. Netty 简介
04. 启动 Bootstrap

05. 事件轮询 EventLoop

06. 通道管道 ChannelPipeline

07. 通道 Channel

08. 字节缓冲区 ByteBuf

09. 通道处理器 ChannelHandler

10. 编解码 Codec

11. 工具类 Util

... 一共 61+ 篇


目前在知识星球更新了《数据库实体设计》目录如下:


01. 商品模块
02. 交易模块
03. 营销模块
04. 公用模块

... 一共 17+ 篇

源码不易↓↓↓

点赞支持老艿艿↓↓

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存