查看原文
其他

分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业监控服务

芋道源码 2019-05-13

点击上方“芋道源码”,选择“置顶公众号”

技术文章第一时间送达!

源码精品专栏

 

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-monitor/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. MonitorService


1. 概述

本文主要分享 Elastic-Job-Lite 作业监控服务。内容对应《官方文档 —— DUMP作业运行信息》。

使用Elastic-Job-Lite过程中可能会碰到一些分布式问题,导致作业运行不稳定。 
由于无法在生产环境调试,通过dump命令可以把作业内部相关信息dump出来,方便开发者debug分析; 另外为了不泄露隐私,已将相关信息中的ip地址以ip1, ip2…的形式过滤,可以在互联网上公开传输环境信息,便于进一步完善Elastic-Job。

涉及到主要类的类图如下( 打开大图 ):

  • 在 Elastic-Job-lite 里,作业监控服务( MonitorService ) 实现了DUMP作业运行信息功能。

你行好事会因为得到赞赏而愉悦 
同理,开源项目贡献者会因为 Star 而更加有动力 
为 Elastic-Job 点赞!传送门

2. MonitorService

MonitorService,作业监控服务。

初始化 MonitorService 方法实现如下

// MonitorService.java
private final String jobName;

public void listen() {
   int port = configService.load(true).getMonitorPort();
   if (port < 0) {
       return;
   }
   try {
       log.info("Elastic job: Monitor service is running, the port is '{}'", port);
       openSocketForMonitor(port);
   } catch (final IOException ex) {
       log.error("Elastic job: Monitor service listen failure, error is: ", ex);
   }
}

private void openSocketForMonitor(final int port) throws IOException {
   serverSocket = new ServerSocket(port);
   new Thread() {

       @Override
       public void run() {
           while (!closed) {
               try {
                   process(serverSocket.accept());
               } catch (final IOException ex) {
                   log.error("Elastic job: Monitor service open socket for monitor failure, error is: ", ex);
               }
           }
       }
   }.start();
}
  • 在作业配置的监控服务端口属性( LiteJobConfiguration.monitorPort )启动 ServerSocket。一个作业对应一个作业监控端口,所以配置时,请不要重复端口噢。

处理 dump命令 方法如下

// MonitorService.java
private void process(final Socket socket) throws IOException {
   try (
           BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
           BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
           Socket autoCloseSocket = socket) {
       // 读取命令
       String cmdLine = reader.readLine();
       if (null != cmdLine && DUMP_COMMAND.equalsIgnoreCase(cmdLine)) { // DUMP
           List<String> result = new ArrayList<>();
           dumpDirectly("/" + jobName, result);
           outputMessage(writer, Joiner.on("\n").join(SensitiveInfoUtils.filterSensitiveIps(result)) + "\n");
       }
   }
}
  • #process() 方法,目前只支持 DUMP 命令。如果你有自定义命令的需要,可以拓展该方法。

  • 调用 #dumpDirectly() 方法,输出当前作业名对应的相关调试信息。

    private void dumpDirectly(final String path, final List<String> result) {
       for (String each : regCenter.getChildrenKeys(path)) {
           String zkPath = path + "/" + each;
           String zkValue = regCenter.get(zkPath);
           if (null == zkValue) {
               zkValue = "";
           }
           TreeCache treeCache = (TreeCache) regCenter.getRawCache("/" + jobName);
           ChildData treeCacheData = treeCache.getCurrentData(zkPath);
           String treeCachePath =  null == treeCacheData ? "" : treeCacheData.getPath();
           String treeCacheValue = null == treeCacheData ? "" : new String(treeCacheData.getData());
           // 判断 TreeCache缓存 和 注册中心 数据一致
           if (zkValue.equals(treeCacheValue) && zkPath.equals(treeCachePath)) {
               result.add(Joiner.on(" | ").join(zkPath, zkValue));
           } else {
               result.add(Joiner.on(" | ").join(zkPath, zkValue, treeCachePath, treeCacheValue));
           }
           // 递归
           dumpDirectly(zkPath, result);
       }
    }
    • 当作业本地 TreeCache缓存 和注册中心数据不一致时,DUMP 出 [zkPath, zkValue, treeCachePath, treeCacheValue]。当相同时,只需 DUMP 出 [zkPath, zkValue],方便看出本地和注册中心是否存在数据差异。

  • DUMP 信息例子如下:

    Yunai-MacdeMacBook-Pro-2:elastic-job yunai$ echo "dump" | nc 127.0.0.1 10024
    /javaSimpleJob/sharding | 
    /javaSimpleJob/sharding/2 | 
    /javaSimpleJob/sharding/2/instance | ip198@-@5100
    /javaSimpleJob/sharding/1 | 
    /javaSimpleJob/sharding/1/instance | ip198@-@5100
    /javaSimpleJob/sharding/0 | 
    /javaSimpleJob/sharding/0/instance | ip198@-@5100
    /javaSimpleJob/servers | 
    /javaSimpleJob/servers/ip2 | 
    /javaSimpleJob/servers/ip198 | 
    /javaSimpleJob/leader | 
    /javaSimpleJob/leader/sharding | 
    /javaSimpleJob/leader/failover | 
    /javaSimpleJob/leader/failover/latch | 
    /javaSimpleJob/leader/failover/items | 
    /javaSimpleJob/leader/election | 
    /javaSimpleJob/leader/election/latch | 
    /javaSimpleJob/leader/election/instance | ip198@-@5100
    /javaSimpleJob/instances | 
    /javaSimpleJob/instances/ip198@-@5100 | 
    /javaSimpleJob/config | {"jobName":"javaSimpleJob","jobClass":"com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob","jobType":"SIMPLE","cron":"0 0/2 * * * ?","shardingTotalCount":3,"shardingItemParameters":"0\u003dBeijing,1\u003dShanghai,2\u003dGuangzhou","jobParameter":"","failover":true,"misfire":true,"description":"","jobProperties":{"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution":false,"maxTimeDiffSeconds":-1,"monitorPort":10024,"jobShardingStrategyClass":"com.dangdang.ddframe.job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy","reconcileIntervalMinutes":10,"disabled":false,"overwrite":true}




欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

已在知识星球更新源码解析如下:

  • 《精尽 Dubbo 源码解析系列》69 篇。

  • 《精尽 Netty 源码解析系列》61 篇。

  • 《精尽 Spring 源码解析系列》35 篇。

  • 《精尽 Spring MVC 源码解析系列》24 篇。

  • 《精尽 MyBatis 源码解析系列》34 篇。

  • 《数据库实体设计》17 篇。

  • 《精尽面试题》6 篇。持续更新...

  • 《精尽学习指南》6 篇。持续更新...


目前在知识星球更新了《精尽面试题》目录如下:

01. Dubbo 面试题

02. Netty 面试题

03. Spring 面试题

04. Spring MVC 面试题

05. Spring Boot 面试题

06. MyBatis 面试题


目前在知识星球更新了《精尽学习指南》目录如下:

01. Dubbo 学习指南

02. Netty 学习指南

03. Spring 学习指南

04. Spring MVC 学习指南

05. Spring Boot 学习指南

06. MyBatis 学习指南


目前在知识星球更新了《Dubbo 源码解析》目录如下:

01. 调试环境搭建
02. 项目结构一览
03. 配置 Configuration
04. 核心流程一览

05. 拓展机制 SPI

06. 线程池

07. 服务暴露 Export

08. 服务引用 Refer

09. 注册中心 Registry

10. 动态编译 Compile

11. 动态代理 Proxy

12. 服务调用 Invoke

13. 调用特性 

14. 过滤器 Filter

15. NIO 服务器

16. P2P 服务器

17. HTTP 服务器

18. 序列化 Serialization

19. 集群容错 Cluster

20. 优雅停机

21. 日志适配

22. 状态检查

23. 监控中心 Monitor

24. 管理中心 Admin

25. 运维命令 QOS

26. 链路追踪 Tracing

... 一共 69+ 篇

目前在知识星球更新了《Netty 源码解析》目录如下:

01. 调试环境搭建
02. NIO 基础
03. Netty 简介
04. 启动 Bootstrap

05. 事件轮询 EventLoop

06. 通道管道 ChannelPipeline

07. 通道 Channel

08. 字节缓冲区 ByteBuf

09. 通道处理器 ChannelHandler

10. 编解码 Codec

11. 工具类 Util

... 一共 61+ 篇


目前在知识星球更新了《数据库实体设计》目录如下:


01. 商品模块
02. 交易模块
03. 营销模块
04. 公用模块

... 一共 17+ 篇


目前在知识星球更新了《Spring 源码解析》目录如下:


01. 调试环境搭建
02. IoC Resource 定位
03. IoC BeanDefinition 载入

04. IoC BeanDefinition 注册

05. IoC Bean 获取

06. IoC Bean 生命周期

... 一共 35+ 篇


目前在知识星球更新了《Spring MVC 源码解析》目录如下:


01. Spring MVC 面试题
02. Spring MVC 学习指南
03. 调试环境搭建
04. 容器的初始化
05. 组件一览
06. 请求处理一览
07. HandlerMapping 组件
08. HandlerAdapter 组件
09. HandlerExceptionResolver 组件
10. RequestToViewNameTranslator 组件
11. LocaleResolver 组件
12. ThemeResolver 组件
13. ViewResolver 组件

14. MultipartResolver 组件

15. FlashMapManager 组件

... 一共 24+ 篇


目前在知识星球更新了《MyBatis 源码解析》目录如下:


01. 调试环境搭建
02. 项目结构一览
03. MyBatis 面试题合集

04. MyBatis 学习资料合集

05. MyBatis 初始化

06. SQL 初始化

07. SQL 执行

08. 插件体系

09. Spring 集成

... 一共 34+ 篇


源码不易↓↓↓

点赞支持老艿艿↓↓

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存