# Reactive-Programming **Repository Path**: seekerszy/reactive-programming ## Basic Information - **Project Name**: Reactive-Programming - **Description**: 响应式编程 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2022-05-06 - **Last Updated**: 2022-05-06 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Future 使用Future是目前日常开发中的习惯性选择,但是Future在一些场景中存在局限性。 使用Future是异步阻塞式的编程方式,而且它无法实现异步任务的组合编排。这意味着虽然提交了异步任务后,主线程可以去做其他事情,但是当尝试获取异步任务结果时,流程还是会阻塞,此时若有其他流程本可以开始执行,也需要被迫等待,大大影响了运行效率。 **场景:完成A, B, C三项任务。C依赖于A。** | A=>C | B ```java public class FutureTest { private static CountDownLatch COUNT_DOWN; private static ExecutorService EXECUTOR_SERVICE; @Before public void setup() { COUNT_DOWN = new CountDownLatch(3); EXECUTOR_SERVICE = Executors.newCachedThreadPool(); } @After public void cleanup(){ EXECUTOR_SERVICE.shutdownNow(); } @Test public void test() throws InterruptedException { Util.timeWatch(); Future taskA = EXECUTOR_SERVICE.submit(() -> finishTaskA()); Future taskB = EXECUTOR_SERVICE.submit(() -> finishTaskB()); String A = getFuture(taskA); String B = getFuture(taskB); Future taskC = EXECUTOR_SERVICE.submit(() -> finishTaskC(A)); String C = getFuture(taskC); COUNT_DOWN.await(); Util.timeWatch(); } private static String finishTaskA() { Util.sleep(100L); Util.print("finish task A."); COUNT_DOWN.countDown(); return "A"; } private static String finishTaskB() { Util.sleep(200L); Util.print("finish task B."); COUNT_DOWN.countDown(); return "B"; } private static String finishTaskC(String A) { Util.sleep(300L); Util.print("finish task C based on " + A); COUNT_DOWN.countDown(); return "C"; } public static T getFuture(Future future) { try { return future.get(1, TimeUnit.SECONDS); } catch (Exception e) { future.cancel(true); } return null; } } ``` result: pool-1-thread-1=========finish task A. pool-1-thread-2=========finish task B. pool-1-thread-2=========finish task C based on A main=========507 ## 替代方案 ### ListenableFuture ```java public class ListenableFutureTest { private static CountDownLatch COUNT_DOWN; private static ListeningExecutorService EXECUTOR_SERVICE; @Before public void setup() { COUNT_DOWN = new CountDownLatch(3); EXECUTOR_SERVICE = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); } @After public void cleanup() { EXECUTOR_SERVICE.shutdownNow(); } @Test public void test() throws InterruptedException { Util.timeWatch(); ListenableFuture taskA = EXECUTOR_SERVICE.submit(() -> finishTaskA()); Futures.addCallback(taskA, new FutureCallback<>() { @Override public void onSuccess(String A) { finishTaskC(A); } @Override public void onFailure(Throwable t) {} }); ListenableFuture taskB = EXECUTOR_SERVICE.submit(() -> finishTaskB()); String A = getFuture(taskA); String B = getFuture(taskB); COUNT_DOWN.await(); Util.timeWatch(); } @Test public void callbackHell() throws InterruptedException { COUNT_DOWN = new CountDownLatch(4); Util.timeWatch(); ListenableFuture taskA = EXECUTOR_SERVICE.submit(() -> finishTaskA()); Futures.addCallback(taskA, new FutureCallback<>() { @Override public void onSuccess(String A) { ListenableFuture taskC = EXECUTOR_SERVICE.submit(() -> finishTaskC(A)); Futures.addCallback(taskC, new FutureCallback<>() { @Override public void onSuccess(String C) { finishTaskD(C); } @Override public void onFailure(Throwable t) {} }); } @Override public void onFailure(Throwable t) {} }); ListenableFuture taskB = EXECUTOR_SERVICE.submit(() -> finishTaskB()); String A = getFuture(taskA); String B = getFuture(taskB); COUNT_DOWN.await(); Util.timeWatch(); } private static String finishTaskA() { Util.sleep(100L); Util.print("finish task A."); COUNT_DOWN.countDown(); return "A"; } private static String finishTaskB() { Util.sleep(200L); Util.print("finish task B."); COUNT_DOWN.countDown(); return "B"; } private static String finishTaskC(String A) { Util.sleep(300L); Util.print("finish task C based on " + A); COUNT_DOWN.countDown(); return "C"; } private static String finishTaskD(String C) { Util.sleep(400L); Util.print("finish task D based on " + C); COUNT_DOWN.countDown(); return "C"; } public static T getFuture(Future future) { try { return future.get(1, TimeUnit.SECONDS); } catch (Exception e) { future.cancel(true); } return null; } } ``` result: pool-1-thread-1=========finish task A. pool-1-thread-2=========finish task B. pool-1-thread-1=========finish task C based on A main=========435 ### CompletableFuture ```java public class CompletableFutureTest { private static ExecutorService EXECUTOR_SERVICE; @Before public void setup() { EXECUTOR_SERVICE = Executors.newCachedThreadPool(); } @After public void cleanup() { EXECUTOR_SERVICE.shutdownNow(); } @Test public void test() { Util.timeWatch(); CompletableFuture taskA = CompletableFuture.supplyAsync(() -> finishTaskA(), EXECUTOR_SERVICE) .orTimeout(1, TimeUnit.SECONDS) .exceptionally(e -> null); CompletableFuture taskC = taskA.thenApply((a) -> finishTaskC(a)) .orTimeout(1, TimeUnit.SECONDS) .exceptionally(e -> null); CompletableFuture taskB = CompletableFuture.supplyAsync(() -> finishTaskB(), EXECUTOR_SERVICE) .orTimeout(1, TimeUnit.SECONDS) .exceptionally(e -> null); CompletableFuture end = CompletableFuture.allOf(taskC, taskB); end.join(); Util.timeWatch(); } private static String finishTaskA() { Util.sleep(100L); Util.print("finish task A."); return "A"; } private static String finishTaskB() { Util.sleep(200L); Util.print("finish task B."); return "B"; } private static String finishTaskC(String A) { Util.sleep(300L); Util.print("finish task C based on " + A); return "C"; } } ``` result: pool-1-thread-1=========finish task A. pool-1-thread-2=========finish task B. pool-1-thread-1=========finish task C based on A main=========423 **API简介:** - supplyAsync:提交一个异步任务,有返回值 - runAsync:提交一个异步任务,无返回值 - thenCombine(Async):合并两个异步任务的结果进行处理,有返回值 - thenAccept(Async):合并两个异步任务的结果进行处理,无返回值 - runAfterBoth(Async):合并两个异步任务,但不关心任务的结果,无返回值 - thenApply(Async):接收前一个任务的结果作为入参,执行下一个任务,有返回值 - thenCompose(Async):接收前一个任务的结果并构建CompletionStage作为入参,执行下一个任务,有返回值 - thenAccept(Async): 接收前一个任务的结果进行处理,无返回值 - thenRun(Async):不会接收前一个任务的结果,无返回值 - applyToEither(Async):获取两个异步任务中最先完成的那一个的结果进行处理,有返回值 - acceptEither(Async):获取两个异步任务中最先完成的那一个的结果进行处理,无返回值 - runAfterEither(Async):获取两个异步任务中最先完成的那一个的结果后进行处理,但不关心结果,无返回值 - exceptionally:接收异常,同步进行处理,有返回值 - handle(Async):无论任务最终能正常返回或是抛出异常,都能根据结果进行处理,有返回值 - whenComplete(Async):无论任务最终能正常返回或是抛出异常,都能根据结果进行处理,无返回值 - anyOf:获取多个异步任务中最先完成的那一个,有返回值。 - allOf:多个异步任务都完成后,才能开始后续处理,无返回值。 - get:阻塞获取异步任务返回值,会抛出检查时异常,编译期需强制捕获异常或对外抛出 - join:阻塞获取异步任务返回值,会抛出运行时异常 **API名称后带Async后缀,代表该方法是另起一个异步线程执行,反之则与前置任务同步进行** **API中带有Executor参数,代表该方法执行时使用的异步线程由指定的线程池管理,反之则使用默认的公共FolkJoinPool统一管理** ### CompletableFuture VS Reactor ```java public class FutureVsReactor { private static Map NAME_MAP; private static Map PRICE_MAP; @Before public void setup() { NAME_MAP = new HashMap() { { put(1L, "DIOR"); put(2L, "LANCOME"); put(3L, "CLINIQUE"); put(4L, "SEPHORA"); put(5L, "YSL"); } }; PRICE_MAP = new HashMap() { { put(1L, new BigDecimal("385.00")); put(2L, new BigDecimal("312.00")); put(3L, new BigDecimal("760.00")); put(4L, new BigDecimal("288.00")); put(5L, new BigDecimal("315.00")); } }; } @After public void cleanup() { NAME_MAP.clear(); PRICE_MAP.clear(); } @Test public void test1() { Util.timeWatch(); CompletableFuture> ids = getIdList(); CompletableFuture> result = ids.thenCompose(list -> { List> combineTask = list.stream().map(id -> { CompletableFuture nameTask = getNameFuture(id); CompletableFuture priceTask = getPriceFuture(id); return nameTask.thenCombineAsync(priceTask, (name, price) -> "brand " + name + " is " + price); }).collect(Collectors.toList()); CompletableFuture[] combinationArray = combineTask.toArray(new CompletableFuture[combineTask.size()]); CompletableFuture allDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combineTask.stream().map(CompletableFuture::join).collect(Collectors.toList())); }); System.out.println(result.join()); Util.timeWatch(); } @Test public void test2() { Util.timeWatch(); Flux ids = reactGetIdList(); Flux combinations = ids.flatMap(id -> { Mono nameTask = reactGetName(id); Mono priceTask = reactGetPrice(id); return nameTask.zipWith(priceTask, (name, price) -> "brand " + name + " is " + price); }); Mono> result = combinations.collectList(); System.out.println(result.block()); Util.timeWatch(); } private static CompletableFuture getPriceFuture(Long i) { return CompletableFuture.supplyAsync(() -> getPriceById(i)); } private static CompletableFuture getNameFuture(Long i) { return CompletableFuture.supplyAsync(() -> getNameById(i)); } private static CompletableFuture> getIdList() { return CompletableFuture.supplyAsync(() -> { Util.sleep(100); Util.print("getIdList"); return Arrays.asList(1L, 2L, 3L, 4L, 5L); }); } private static String getNameById(Long id) { Util.sleep(100); Util.print("getNameById id: " + id); return NAME_MAP.get(id); } private static BigDecimal getPriceById(Long id) { Util.sleep(150); Util.print("getPriceById id: " + id); return PRICE_MAP.get(id); } private static Mono reactGetPrice(Long id) { return Mono.fromCallable(() -> getPriceById(id)).subscribeOn(Schedulers.elastic()); } private static Mono reactGetName(Long id) { return Mono.fromCallable(() -> getNameById(id)).subscribeOn(Schedulers.elastic()); } private static Flux reactGetIdList() { return Flux.create((Consumer>)fluxSink -> { for (Long i = 1L; i <= 5L; i++) { fluxSink.next(i); } fluxSink.complete(); Util.sleep(100); Util.print("getIdList"); }); } } ``` # 响应式编程 在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在JVM上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式编程规范,它定义了一系列标准接口和交互规范。 基于这套标准接口和规范,涌现出了一系列响应式编程框架: - RxJava 2.x - JDK9 Flow - Reactor 在计算机领域,响应式编程是一个专注于数据流变化传递的异步编程范式。 ## 特点 - 面向数据流:链式调用风格 - 发布订阅模型:数据流从发布者上游下发,被下游订阅者处理 - ”观察者模式“的扩展:不光监听数据的到达,同时具备发布者下发完成通知,错误传播和与上游通信的能力 - 命令式 (imperative) VS 声明式 (declarative):开发者负责描述发布订阅的处理逻辑而不是精确地控制流程中的每一个细节。 ```java public class ImperativeVSDeclarative { @Test public void testImperative() { List source = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); Iterator iterator = source.iterator(); while (iterator.hasNext()) { Integer number = iterator.next(); try { Util.print(-number); } catch (Exception e) { e.printStackTrace(); } } Util.print("处理完成"); } @Test public void testDeclarative() { Flux source = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); source.map(number -> -number).subscribe( // onNext number -> Util.print(number), // onError e -> e.printStackTrace(), // onComplete () -> Util.print("处理完成")); } } ``` ## Reactor 从标准接口和规范说起 ```java org.reactivestreams reactive-streams public interface Publisher { void subscribe(Subscriber var1); } public interface Subscriber { void onSubscribe(Subscription var1); void onNext(T var1); void onError(Throwable var1); void onComplete(); } public interface Subscription { void request(long var1); void cancel(); } public interface Processor extends Subscriber, Publisher {} ``` ```java Flux.just("HelloWorld").subscribe(); ``` ![响应式交互示意图](/pic/reactive-stream-triangle.jpg) 1. Publisher与Subscriber通过**subscribe()**方法建立订阅关系。 2. Subscriber通过**onSubscribe()**方法对订阅的行为作出响应。由Publisher在subscribe()方法中构建一个Subscription对象,传入onSubscribe()方法,使Subscriber可以通过Subscription与Publisher反向通信。 3. Subscriber通过Subscription调用**request()**方法向Publisher发起请求,指定从Publisher获取的元素数量。 4. Publisher接收到请求,委托由其构建的Subscription对象负责下发元素。 5. Subscriber接收到元素,调用**onNext()**方法进行消费。如果消费过程中出现异常,会调用**onError()**方法对异常作出响应。如果Publisher已生产完毕,无法继续下发元素,则会通知Subscriber,Subscriber通过**onComplete()**方法,对完成作出响应。 6. 若Subscriber消费完毕,会再次调用**request()**方法对Publisher发起请求,或者视情况调用**cancel()**方法通知Publisher取消下发元素。 ### Publisher 发布者 在Reactor中发布者可分为两大类:**Flux 和 Mono** **Flux**:代表一个可以输出0....N项的序列。----------- **Collection** ![](https://projectreactor.io/docs/core/release/reference/images/flux.svg) ```java public class FluxTest { @Test public void testJust() { Flux.just(1, 2, 3).subscribe(Util::print); } @Test public void testFromIterable() { Flux.fromIterable(Arrays.asList(1, 2, 3)).subscribe(Util::print); } @Test public void testFromStream() { Flux.fromStream(Stream.of(1, 2, 3)).subscribe(Util::print); } /** * 编程式创建序列 * 状态感知型下发元素 */ @Test public void testGenerate() { Flux.generate(() -> 0, (state, sink) -> { if (state > 10) { sink.complete(); } else { Util.print("生产者生产内容:" + state); sink.next(state); } return ++state; }).subscribe(Util::print); /*for (int i = 0; i < 3; i++) { 生产元素 }*/ } /** * 编程式创建序列 * 无状态下发元素 */ @Test public void testCreateSync() { Flux.create(sink -> { for (int i = 0; i < 50; i++) { Util.print("发射了元素" + i); sink.next(i); } sink.complete(); }).subscribe(Util::print); } /** * 编程式创建序列 * 可以结合异步下发元素 */ @Test public void testCreateAsync() { BlockingQueue queue = new ArrayBlockingQueue<>(50); new Thread(() -> { for (int i = 0; i < 50; i++) { queue.offer(i); } }).start(); Flux.create(sink -> { try { Integer i; while ((i = queue.poll(100, TimeUnit.MILLISECONDS)) != null) { sink.next(i); } sink.complete(); } catch (InterruptedException e) { e.printStackTrace(); } }).subscribe(Util::print); } } ``` **Mono**:代表一个可以输出0-1项的序列----------- **Optional** ![](https://projectreactor.io/docs/core/release/reference/images/mono.svg) ```java public class MonoTest { @Test public void testJust(){ Mono.just(1).subscribe(Util::print); } @Test public void testFromCallable(){ Mono.fromCallable(()->1).subscribe(Util::print); } @Test public void testFromCompletableFuture(){ Mono.fromFuture(CompletableFuture.supplyAsync(()->1)).subscribe(Util::print); } } ``` ### Subscriber 订阅者 subscribe()方法有多个重载方法,最多可以定义onNext, onError, onComplete, onSubscribe四种行为,生成的最终消费者是一个LambdaSubscriber对象。 如果希望将每个订阅者的消费行为单独管理,单独封装,可以采用自定义订阅者BaseSubscriber,此外它还提供了更多的回调函数供开发者实现。 ```java public final Disposable subscribe( @Nullable Consumer consumer, @Nullable Consumer errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer subscriptionConsumer) { return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer, completeConsumer, subscriptionConsumer, null)); } ``` ```java public class SubscriberTest { @Test public void testLambdaSubscriber() { Flux.just(1, 2, 3).subscribe( // onNext i -> Util.print("onNext: " + i), // onError e -> e.printStackTrace(), // onComplete () -> Util.print("onComplete"), // onSubscriber s -> { Util.print("onSubscribe"); s.request(3); }); } @Test public void testBaseSubscriber() { Flux flux = Flux.just(1, 2, 3); flux.map(i -> i - 1).subscribe(new BaseSubscriber<>() { @Override protected void hookOnSubscribe(Subscription s) { Util.print("onSubscribe"); s.request(3); } @Override protected void hookOnNext(Integer i) { Util.print("onNext: " + i); } @Override protected void hookOnError(Throwable e) { e.printStackTrace(); } @Override protected void hookOnComplete() { Util.print("onComplete"); } @Override protected void hookOnCancel() { Util.print("cancel"); } @Override protected void hookFinally(SignalType type) { Util.print("Finally"); } }); } } ``` ### Backpressure 背压 *the ability for the consumer to signal the producer that the rate of emission is too high* **背压策略**: - IGNORE:完全忽略下游的背压请求,直接向下游下发元素。当下游队列充满时会抛出IllegalStateException。 - ERROR:下游来不及处理数据则抛出IllegalStateException。 - DROP:下游来不及处理的数据全部扔掉,因此下游请求获取数据时可能会等待,直到上游刚下发一个新元素。 - LATEST:下游每次请求都会获取下发序列中最新的元素,来不及处理的数据不会扔掉。 - BUFFER:来不及处理的数据都会缓存在上游 (默认) 。 ```java public class BackPressureTest { private static final CountDownLatch countDownLatch = new CountDownLatch(1); /** * 背压策略 */ @Test public void testBackPressureStrategy() throws InterruptedException { // 订阅者 SlowSubscriber slowSubscriber = new SlowSubscriber(); // 发布者 Flux fastPublisher = createFlux(FluxSink.OverflowStrategy.BUFFER) .doOnRequest(n -> print("下游------>向上游请求" + n + "个数据")) .publishOn(Schedulers.single(), 1); fastPublisher.subscribe(slowSubscriber); countDownLatch.await(); } private static Flux createFlux(FluxSink.OverflowStrategy strategy) { return Flux.create(sink -> { for (int i = 0; i < 5; i++) { sink.next(i); print("上游------>数据源创建了数据:" + i); } sink.complete(); }, strategy); } private static class SlowSubscriber extends BaseSubscriber { @Override protected void hookOnSubscribe(Subscription subscription) { // 订阅时请求1个数据 print("订阅开始"); request(1); } @Override protected void hookOnNext(Integer i) { print(("线程" + Thread.currentThread().getName() + "接收数据:" + i)); request(1); sleep(30); } @Override protected void hookOnComplete() { print("完成"); countDownLatch.countDown(); } } } ``` - **BUFFER:** main=========订阅开始 main=========下游------>向上游请求1个数据 main=========上游------>数据源创建了数据:0 single-1=========线程single-1接收数据:0 main=========上游------>数据源创建了数据:1 main=========上游------>数据源创建了数据:2 single-1=========下游------>向上游请求1个数据 single-1=========线程single-1接收数据:1 main=========上游------>数据源创建了数据:3 main=========上游------>数据源创建了数据:4 single-1=========下游------>向上游请求1个数据 single-1=========线程single-1接收数据:2 single-1=========下游------>向上游请求1个数据 single-1=========线程single-1接收数据:3 single-1=========下游------>向上游请求1个数据 single-1=========线程single-1接收数据:4 single-1=========下游------>向上游请求1个数据 single-1=========完成 - **LATEST:** main=========订阅开始 main=========下游------>向上游请求1个数据 main=========上游------>数据源创建了数据:0 single-1=========线程single-1接收数据:0 main=========上游------>数据源创建了数据:1 single-1=========下游------>向上游请求1个数据 main=========上游------>数据源创建了数据:2 single-1=========线程single-1接收数据:2 main=========上游------>数据源创建了数据:3 single-1=========下游------>向上游请求1个数据 main=========上游------>数据源创建了数据:4 single-1=========线程single-1接收数据:4 single-1=========下游------>向上游请求1个数据 single-1=========完成 ### Operator 操作符 当业务处理十分复杂时,我们倾向于将复杂逻辑拆分成一个个小任务进行封装,于是就有了Operator操作符。 Reactor中操作符就像流水线上的工人,每一个操作符负责处理上游来的数据,并下发至下游操作符,最终到达订阅者以结束整个流程。 ```java public class OperatorTest { @Test public void testFilter() { // 2 4 6 8 10 Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(Util::print); } @Test public void testBuffer() { // [1,2] [3,4] [5,6] Flux.just(1, 2, 3, 4, 5, 6).buffer(2).subscribe(Util::print); } @Test public void testTake() { // 1 2 Flux.just(1, 2, 3, 4, 5, 7).take(2).subscribe(Util::print); } @Test public void testFlatMap() { // -1 -3 -5 -2 -4 -6 Flux.just(Arrays.asList(1, 3, 5), Arrays.asList(2, 4, 6)). flatMap(x -> Flux.fromStream(x.stream().map(i -> -i))).subscribe(Util::print); } } ``` **由于操作符起到了承上启下的作用,在扮演的角色上,它既可以看作是一个中间发布者,又可以看作是一个中间订阅者。** ```java final class FluxFilter extends InternalFluxOperator { // ... static final class FilterSubscriber implements InnerOperator, Fuseable.ConditionalSubscriber { // ... } } ``` ![](/pic/FluxOperator.jpg) ![](/pic/InnerOperator.jpg) **需要注意,整个调用链直到订阅者订阅发布者时(即subscribe()方法被调用),发布订阅流程才会开始运作。** ```java Flux.range(1, 9) .map(i -> i + 1) .filter(i -> i % 2 == 0) .subscribe( // onNext i -> Util.print("onNext: " + i), // onError e -> e.printStackTrace(), // onComplete () -> Util.print("onComplete")); ``` - 订阅链组装阶段: - ```java public final Flux map(Function mapper) { if (this instanceof Fuseable) { return onAssembly(new FluxMapFuseable<>(this, mapper)); } // FluxRange return onAssembly(new FluxMap<>(this, mapper)); } abstract class InternalFluxOperator extends FluxOperator implements Scannable, OptimizableOperator { @Nullable final OptimizableOperator optimizableOperator; protected InternalFluxOperator(Flux source) { super(source); this.optimizableOperator = source instanceof OptimizableOperator ? (OptimizableOperator) source : null; } } public final void subscribe(Subscriber actual) { // FluxFilter CorePublisher publisher = Operators.onLastAssembly(this); // LambdaSubscriber CoreSubscriber subscriber = Operators.toCoreSubscriber(actual); if (publisher instanceof OptimizableOperator) { OptimizableOperator operator = (OptimizableOperator) publisher; while (true) { // 建立Subscriber之间的联系 subscriber = operator.subscribeOrReturn(subscriber); // ... // FluxMap OptimizableOperator newSource = operator.nextOptimizableSource(); // ... operator = newSource; } } publisher.subscribe(subscriber); } public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { if (actual instanceof ConditionalSubscriber) { return new FilterConditionalSubscriber<>((ConditionalSubscriber) actual, predicate); } return new FilterSubscriber<>(actual, predicate); } ``` **作为发布者,后一个节点持有前一个节点的引用。** **作为订阅者,前一个节点持有后一个节点的引用。** ![](/pic/SubscriberChain.jpg) - 运行阶段 - ```java // FluxRange的subscribe方法 public void subscribe(CoreSubscriber actual) { // ... if (actual instanceof ConditionalSubscriber) { actual.onSubscribe(new RangeSubscriptionConditional((ConditionalSubscriber) actual, st, en)); return; } // MapSubscriber actual.onSubscribe(new RangeSubscription(actual, st, en)); } ``` - image-20211110153952012 ### Scheduler 调度器 *Reactor, like RxJava, can be considered to be **concurrency-agnostic**.* Reactor是一个响应式编程框架,并不是一个异步编程框架。如果希望在Reactor中使用异步编程,则需要调度器和一些操作符的支持。 **调度器**:负责异步任务调度。 **Schedulers.single()**:生成含有一个线程的调度器 **Schedulers.parallel()**:生成默认含有CPU可用核心数个数线程的调度器 **Schedulers.elastic()**:生成一个支持弹性伸缩的调度器 **Schedulers.fromExecutor(ExecutorService)**:基于现有线程池生成的调度器 #### 与线程池的区别 调度器调度资源的最小单位是线程池 ```java ParallelScheduler(int parallelism, ThreadFactory factory) { if (n <= 0) { throw new IllegalArgumentException("n > 0 required but it was " + n); } this.n = parallelism; this.factory = factory; init(n); } void init(int n) { ScheduledExecutorService[] a = new ScheduledExecutorService[n]; for (int i = 0; i < n; i++) { a[i] = Schedulers.decorateExecutorService(this, this.get()); } EXECUTORS.lazySet(this, a); } public ScheduledExecutorService get() { ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(1, factory); poolExecutor.setMaximumPoolSize(1); poolExecutor.setRemoveOnCancelPolicy(true); return poolExecutor; } ``` #### 操作符 publishOn 将publishOn下游调用链的执行全部置于另一个线程 ``` Flux publishOn(Scheduler scheduler, int prefetch) ``` ```java public static void main(String[] args) { // 定义初始状态为0 Flux.generate(() -> 0, (BiFunction, Integer>)(state, sink) -> { if (state == 5) { // state更新至5时,结束生产 sink.complete(); } else { // 生产内容 Util.print("生产者生产内容:" + state); sink.next(state); Util.sleep(100); } return ++state; }).publishOn(Schedulers.parallel(), 2) .map(state -> { Util.print("将上游过来的数据state取反: " + -state); return -state; }).subscribe( // onNext (i) -> Util.print("订阅者接收到内容:" + i), // onError (e) -> e.printStackTrace(), // onComplete () -> { Util.print("Subscriber consume finished."); }); } ``` - **生产速度 < 消费速度:** main=========生产者下发内容:0 parallel-1=========将上游过来的数据state取反: 0 parallel-1=========订阅者接收到内容:0 main=========生产者下发内容:1 parallel-1=========将上游过来的数据state取反: -1 parallel-1=========订阅者接收到内容:-1 main=========生产者下发内容:2 parallel-1=========将上游过来的数据state取反: -2 parallel-1=========订阅者接收到内容:-2 main=========生产者下发内容:3 parallel-1=========将上游过来的数据state取反: -3 parallel-1=========订阅者接收到内容:-3 main=========生产者下发内容:4 parallel-1=========将上游过来的数据state取反: -4 parallel-1=========订阅者接收到内容:-4 parallel-1=========Subscriber consume finished. - **生产速度 > 消费速度** main=========生产者下发内容:0 main=========生产者下发内容:1 parallel-1=========将上游过来的数据state取反: 0 parallel-1=========订阅者接收到内容:0 parallel-1=========将上游过来的数据state取反: -1 parallel-1=========订阅者接收到内容:-1 parallel-1=========生产者下发内容:2 parallel-1=========生产者下发内容:3 parallel-1=========将上游过来的数据state取反: -2 parallel-1=========订阅者接收到内容:-2 parallel-1=========将上游过来的数据state取反: -3 parallel-1=========订阅者接收到内容:-3 parallel-1=========生产者下发内容:4 parallel-1=========将上游过来的数据state取反: -4 parallel-1=========订阅者接收到内容:-4 parallel-1=========Subscriber consume finished. **原理:** ```java // FluxPublishOn - PublishOnSubscriber void runAsync() { int missed = 1; final Subscriber a = actual; final Queue q = queue; long e = produced; for (; ; ) { long r = requested; while (e != r) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { // .... return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } // 队列中获取不到元素则返回 if (empty) { break; } // 继续下发 a.onNext(v); // 消费计数 e++; // 消费累计达到上次的请求数量 if (e == limit) { if (r != Long.MAX_VALUE) { r = REQUESTED.addAndGet(this, -e); } // 请求上游 s.request(e); e = 0L; } } // ..... } } ``` #### 操作符 subscribeOn 将subscribeOn上游调用链的执行全部置于另一个线程 ``` Flux subscribeOn(Scheduler scheduler, boolean requestOnSeparateThread) ``` ```java public static void main(String[] args) { Flux.create(sink -> { for (int i = 0; i < 10; i++) { Util.print("发射了元素" + i); sink.next(i); Util.sleep(200); } sink.complete(); }).doOnRequest(x -> System.out.println("onRequest的线程是:" + Thread.currentThread().getName())) .subscribeOn(getMyScheduler(), false) .publishOn(Schedulers.elastic(), 2) .subscribe( // onNext (x) -> Util.print("消费了元素" + x), // onError Throwable::printStackTrace, // onComplete () -> { Util.print("Subscriber consume finished."); }); } public static Scheduler getMyScheduler() { Executor executor = new ThreadPoolExecutor( 10, //corePoolSize 10, //maximumPoolSize 0L, TimeUnit.MILLISECONDS, //keepAliveTime, unit new LinkedBlockingQueue<>(1000), //workQueue Executors.defaultThreadFactory() ); return Schedulers.fromExecutor(executor); } ``` - **requestOnSeparateThread = true** onRequest的线程是:pool-1-thread-1 pool-1-thread-1=========发射了元素0 pool-1-thread-1=========发射了元素1 elastic-2=========消费了元素0 pool-1-thread-1=========发射了元素2 elastic-2=========消费了元素1 onRequest的线程是:pool-1-thread-2 elastic-2=========消费了元素2 pool-1-thread-1=========发射了元素3 pool-1-thread-1=========发射了元素4 elastic-2=========消费了元素3 onRequest的线程是:pool-1-thread-3 pool-1-thread-1=========发射了元素5 elastic-2=========消费了元素4 pool-1-thread-1=========发射了元素6 elastic-2=========消费了元素5 onRequest的线程是:pool-1-thread-4 pool-1-thread-1=========发射了元素7 elastic-2=========消费了元素6 pool-1-thread-1=========发射了元素8 elastic-2=========消费了元素7 onRequest的线程是:pool-1-thread-5 pool-1-thread-1=========发射了元素9 elastic-2=========消费了元素8 elastic-2=========消费了元素9 elastic-2=========Subscriber consume finished. - **questOnSeparateThread = false** onRequest的线程是:pool-1-thread-1 pool-1-thread-1=========发射了元素0 pool-1-thread-1=========发射了元素1 elastic-2=========消费了元素0 pool-1-thread-1=========发射了元素2 elastic-2=========消费了元素1 onRequest的线程是:elastic-2 pool-1-thread-1=========发射了元素3 elastic-2=========消费了元素2 pool-1-thread-1=========发射了元素4 elastic-2=========消费了元素3 onRequest的线程是:elastic-2 pool-1-thread-1=========发射了元素5 elastic-2=========消费了元素4 pool-1-thread-1=========发射了元素6 elastic-2=========消费了元素5 onRequest的线程是:elastic-2 pool-1-thread-1=========发射了元素7 elastic-2=========消费了元素6 pool-1-thread-1=========发射了元素8 elastic-2=========消费了元素7 onRequest的线程是:elastic-2 pool-1-thread-1=========发射了元素9 elastic-2=========消费了元素8 elastic-2=========消费了元素9 onRequest的线程是:elastic-2 elastic-2=========Subscriber consume finished. ```java void requestUpstream(final long n, final Subscription s) { if (!requestOnSeparateThread || Thread.currentThread() == THREAD.get(this)) { s.request(n); } else { try { worker.schedule(() -> s.request(n)); } catch (RejectedExecutionException ree) { if(!worker.isDisposed()) { throw Operators.onRejectedExecution(ree, this, null, null, actual.currentContext()); } } } } ``` #### 操作符 parallel & runOn parallel和runOn两个操作符配合使用后,可以实现多个订阅者并行消费,提升消费效率。 ```java ParallelFlux parallel(int parallelism, int prefetch) ParallelFlux runOn(Scheduler scheduler, int prefetch) ``` ![parallel示意图](/pic/ParallelFlux.png) ```java public static void main(String[] args) throws InterruptedException { Flux.generate(() -> 0, (BiFunction, Integer>)(state, sink) -> { if (state == 10) { sink.complete(); } else { // 生产内容 Util.print("发射了元素:" + state); sink.next(state); } return ++state; }).parallel(3, 6) .runOn(Schedulers.elastic(), 2) .subscribe( // onNext (x) -> Util.print("消费了元素" + x), // onError Throwable::printStackTrace, // onComplete () -> { Util.print("Subscriber consume finished."); countDownLatch.countDown(); }); countDownLatch.await(); } ``` - 结果: main=========发射了元素:0 main=========发射了元素:1 main=========发射了元素:2 main=========发射了元素:3 main=========发射了元素:4 main=========发射了元素:5 elastic-2=========消费了元素0 main=========发射了元素:6 elastic-2=========消费了元素3 main=========发射了元素:7 elastic-2=========消费了元素6 main=========发射了元素:8 elastic-4=========消费了元素2 elastic-3=========消费了元素1 main=========发射了元素:9 elastic-2=========消费了元素7 elastic-3=========消费了元素4 elastic-4=========消费了元素5 elastic-2=========消费了元素8 elastic-2=========消费了元素9 elastic-3=========Subscriber consume finished. elastic-4=========Subscriber consume finished. elastic-2=========Subscriber consume finished. ### Reactor使用中遇到的坑 #### subscribeOn ```java public static void main(String[] args) throws InterruptedException { Flux.create(sink -> { for (int i = 0; i < 10; i++) { Util.print("发射了元素" + i); sink.next(i); Util.sleep(200); } sink.complete(); }).doOnRequest(x -> System.out.println("onRequest的线程是:" + Thread.currentThread().getName())) .subscribeOn(Schedulers.parallel(), true) .publishOn(Schedulers.elastic(), 2) .subscribe( // onNext (x) -> Util.print("消费了元素" + x), // onError Throwable::printStackTrace, // onComplete () -> { Util.print("Subscriber consume finished."); }); } ``` - 结果: onRequest的线程是:parallel-1 parallel-1=========发射了元素0 elastic-2=========消费了元素0 parallel-1=========发射了元素1 elastic-2=========消费了元素1 **parallel-1=========发射了元素2** **parallel-1=========发射了元素3** **parallel-1=========发射了元素4** **parallel-1=========发射了元素5** **parallel-1=========发射了元素6** **parallel-1=========发射了元素7** **parallel-1=========发射了元素8** **parallel-1=========发射了元素9** onRequest的线程是:parallel-1 elastic-2=========消费了元素2 elastic-2=========消费了元素3 onRequest的线程是:parallel-1 elastic-2=========消费了元素4 elastic-2=========消费了元素5 onRequest的线程是:parallel-1 elastic-2=========消费了元素6 elastic-2=========消费了元素7 onRequest的线程是:parallel-1 elastic-2=========消费了元素8 elastic-2=========消费了元素9 elastic-2=========Subscriber consume finished. #### timeout 异步非阻塞的超时处理操作符 ```java /** * Switch to a fallback {@link Flux} as soon as no item is emitted within the * given {@link Duration} from the previous emission (or the subscription for the first item). *

* If the given {@link Publisher} is null, signal a {@link TimeoutException} instead. * * @param timeout the timeout between two signals from this {@link Flux} * @param fallback the fallback {@link Publisher} to subscribe when a timeout occurs * * @return a {@link Flux} that will fallback to a different {@link Publisher} in case of a per-item * timeout */ Flux timeout(Duration timeout, @Nullable Publisher fallback) ``` ```java public class TimeoutTest { private static CountDownLatch countDownLatch = new CountDownLatch(1); @Test public void testTimeout1() throws InterruptedException { Flux.just(((Supplier) () -> { Util.sleep(400L); return 1; }).get()) .timeout(Duration.ofMillis(300), Flux.just(2)) .subscribe( (x) -> Util.print("消费了元素" + x), Throwable::printStackTrace, () -> countDownLatch.countDown()); countDownLatch.await(); } @Test public void testTimeout2() throws InterruptedException { Flux.generate(() -> 1, (BiFunction, Integer>) (state, sink) -> { Util.sleep(400); sink.next(state); sink.complete(); return state; }).timeout(Duration.ofMillis(300), Flux.just(2)) .subscribe( (x) -> Util.print("消费了元素" + x), Throwable::printStackTrace, () -> countDownLatch.countDown()); countDownLatch.await(); } } ``` - testTimeout1: **main**=========消费了元素1 - testTimeout2: **parallel-1**=========消费了元素2