查看原文
其他

熔断器 Hystrix 源码解析 —— 执行命令方式

芋艿 芋道源码 2019-05-13

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-mode/ 「芋道源码」欢迎转载,保留摘要,谢谢!

微信排版总是蹦蹦的,请点击【阅读原文】,抱歉~

本文主要基于 Hystrix 1.5.X 版本

  • 1. 概述

  • 2. 实现

  • 3. BlockingObservable

  • 666. 彩蛋


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表

  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址

  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢

  4. 新的源码解析文章实时收到通知。每周更新一篇左右

  5. 认真的源码交流微信群。


1. 概述

本文主要分享 Hystrix 执行命令方法

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

在官方提供的示例中,我们看到 CommandHelloWorld 通过继承 HystrixCommand 抽象类,有四种调用方式:

方法
#execute()同步调用,返回直接结果
#queue()异步调用,返回 java.util.concurrent.Future
#observe()异步调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber处理结果
#toObservable()未调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber处理结果

    • 第四种方式,点击 #testToObservable()查看笔者补充的示例。






    推荐 Spring Cloud 书籍

    • 请支持正版。下载盗版,等于主动编写低级 BUG 。

    • 程序猿DD —— 《Spring Cloud微服务实战》

    • 周立 —— 《Spring Cloud与Docker微服务架构实战》

    • 两书齐买,京东包邮。

    2. 实现

    1. // AbstractCommand.java

    2. abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {

    3.    // ... 省略无关属性与方法

    4.    public Observable<R> toObservable() {

    5.        return Observable.defer(new Func0<Observable<R>>() {

    6.            @Override

    7.            public Observable<R> call() {

    8.                // ....

    9.            }

    10.        }

    11.    }

    12.    public Observable<R> observe() {

    13.        // us a ReplaySubject to buffer the eagerly subscribed-to Observable

    14.        ReplaySubject<R> subject = ReplaySubject.create();

    15.        // eagerly kick off subscription

    16.        final Subscription sourceSubscription = toObservable().subscribe(subject);

    17.        // return the subject that can be subscribed to later while the execution has already started

    18.        return subject.doOnUnsubscribe(new Action0() {

    19.            @Override

    20.            public void call() {

    21.                sourceSubscription.unsubscribe();

    22.            }

    23.        });

    24.    }

    25. }

    26. // HystrixCommand.java

    27. public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {

    28.    // ... 省略无关属性与方法

    29.    public Future<R> queue() {

    30.        final Future<R> delegate = toObservable().toBlocking().toFuture();

    31.        final Future<R> f = new Future<R>() {

    32.            // ... 包装 delegate

    33.        }

    34.        // ...

    35.        return f;

    36.    }

    37.    public R execute() {

    38.        try {

    39.            return queue().get();

    40.        } catch (Exception e) {

    41.            throw Exceptions.sneakyThrow(decomposeException(e));

    42.        }

    43.    }

    44.    protected abstract R run() throws Exception;

    45. }

    • #toObservable() 方法 :做订阅,返回干净的 Observable 。这就是为什么上文说“未调用”

    • #observe() 方法 :调用 #toObservable() 方法的基础上,向 Observable 注册 rx.subjects.ReplaySubject 发起订阅



      • ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。感兴趣的同学可以阅读 《ReactiveX/RxJava文档中文版 —— Subject》 。

    • #queue() 方法 :调用 #toObservable() 方法的基础上,调用:





      • #run() 方法 :子类实现该方法,执行正常的业务逻辑

      • Observable#toBlocking() 方法 :将 Observable 转换成阻塞的 rx.observables.BlockingObservable

      • BlockingObservable#toFuture() 方法 :返回可获得 #run() 抽象方法执行结果的 Future 。



    • BlockingObservable 在 「3. BlockingObservable」 详细解析。

    • #execute() 方法 :调用 #queue() 方法的基础上,调用 Future#get() 方法,同步返回 #run() 的执行结果。

    • 整理四种调用方式如下:


      FROM 《【翻译】Hystrix文档-实现原理》

    3. BlockingObservable

    本小节为拓展内容,源码解析 RxJava ( 非 Hystrix ) 的 rx.observables.BlockingObservable 的实现,所以你可以选择:

    • 1 ) 跳过本小节,不影响对本文的理解。

    • 2 ) 选择阅读 《ReactiveX/RxJava文档中文版 —— 阻塞操作》 ,理解 BlockingObservable 的原理。

    • 3 ) 选择阅读本小节,理解 BlockingObservable 的原理以及实现。

    《RxJava 源码解析 —— BlockingObservable》

    666. 彩蛋

    第一篇 Hystrix 正式的源码解析。

    梳理 Hystrix 的源码还是蛮痛苦的,主要是因为对 RxJava 不够熟悉。

    胖友,分享一波朋友圈可好!


      您可能也对以下帖子感兴趣

      文章有问题?点此查看未经处理的缓存