Project Reactor 响应式编程

一. 什么是响应式编程?

在计算中,响应式编程或反应式编程是一种面向数据流和变化传播的声明式编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

上面一段话来自维基百科。

响应式编程顾名思义就是在于响应二字,我们需要在某个事件发生时做出响应。

我们现实生活就是对响应式很好的解释,我们人类的举动大多都是基于事件驱动模式,当有人呼喊你的名字,你会根据这个事件来判断要不要进行应答,这个过程其实就是产生事件,然后我们作为消费者对事件进行处理,而我们的处理结果也会继续向下传递。

在响应式编程中,通常是采用异步回调的方式,回调方法的调用和控制则会由响应式框架来完成,对于应用开发来说只需要关注回调方法的实现就可以了。

这里提一个著名的设计原则:好莱坞原则(Hollywood principle)

Don’t call us, we will call you.

演员提交简历之后,回家等着就好,演艺公司会主动打电话给你。

二. Project Reactor介绍

Java中最早的Reactor库RxJava借鉴于.Net的Reactor Extensions,后来Jdk在Java9提供了标准化的响应式库实现java.util.concurrent.Flow,再后来,Project Reactor作为第四代响应式编程框架出现,它是一个完全非阻塞响应式编程的基石,直接集成了Java函数式API,特别是CompletableFuture,Stream和Duration。Reactor Netty实现了非阻塞跨进程通信,提升了服务间通信效率。

我们在平常开发中,异步编程无非是使用JUC包下的工具类或者一些Java同步语义。

  • 阻塞等待:如 Future.get()
  • 不安全的数据访问:如 ReentrantLock.lock()
  • 异常冒泡:如 try…​catch…​finally
  • 同步阻塞:如 synchronized{ }
  • Wrapper分配(GC 压力):如 new Wrapper(event)

或者自定义线程池,但也会遇到诸如一下的问题。

  • Callable 分配 — 可能导致 GC 压力。
  • 同步过程强制每个线程执行停-检查操作。
  • 消息的消费可能比生产慢。
  • 使用线程池(ThreadPool)将任务传递给目标线程 — 通过 FutureTask 方式肯定会产生 GC 压力。
  • 阻塞直至IO回调。

上面等等问题都会造成的系统性能瓶颈或者安全问题,在Future.get时我们无法避免阻塞等待,最差情况下程序运行其实还是同步的,使用Reactor不但可以很有效的解决上述问题,还能让我们写出更加简洁明了的代码。

三. Reactor核心概念

代码: //github.com/CasterWx/reactor-ppt

Flux

Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用。

1. just()

可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。

Flux.just("hello", "world")
	.doOnNext((i) -> {
            System.out.println("[doOnNext] " + i);
        })
	.doOnComplete(() -> System.out.println("[doOnComplete]"))
	.subscribe(i -> System.out.println("[subscribe] " + i));

// 执行结果
[doOnNext] hello
[subscribe] hello
[doOnNext] world
[subscribe] world
[doOnComplete]

2. fromArray(),fromIterable()和 fromStream()

可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。

List<String> arr = Arrays.asList("flux", "mono", "reactor", "core");
Flux.fromIterable(arr)
	.doOnNext((i) -> {
            System.out.println("[doOnNext] " + i);
        })
	.subscribe((i) -> {
            System.out.println("[subscribe] " + i);
        });
//执行结果
[doOnNext] flux
[subscribe] flux
[doOnNext] mono
[subscribe] mono
[doOnNext] reactor
[subscribe] reactor
[doOnNext] core
[subscribe] core

3. empty()

创建一个不包含任何元素,只发布结束消息的序列。

 Flux.empty()
	.doOnNext(i -> {
            System.out.println("[doOnNext] " + i);
        }).doOnComplete(() -> {
            System.out.println("[DoOnComplete] ");
        }).subscribe(i -> {
            System.out.println("[subscribe] " + i);
        });
//执行结果
[DoOnComplete]

4. error(Throwable error)

创建一个只包含错误消息的序列。

try {
	int []arr = new int[5];
	arr[10] = 2;
} catch (Exception e) {
	Flux.error(e).subscribe(i -> {
	System.out.println("error subscribe");
	});
}
//执行结果

5. never():创建一个不包含任何消息通知的序列。

Flux.never()
	.doOnNext(i -> {
            System.out.println("doOnNext " + i);
        }).doOnComplete(() -> {
            System.out.println("doOnComplete");
        }).subscribe((i) -> {
            System.out.println("subscribe " + i);
        });
//执行结果
空

6. range(int start, int count)

创建包含从 start 起始的 count 个数量的 Integer 对象的序列。

Flux.range(5, 10)
	.doOnNext(i -> {
            System.out.println("doOnNext " + i);
        }).doOnComplete(() -> {
            System.out.println("doOnComplete");
        }).subscribe((i) -> {
            System.out.println("subscribe " + i);
        });
//执行结果
doOnNext 5
subscribe 5
doOnNext 6
subscribe 6
doOnNext 7
subscribe 7
doOnNext 8
subscribe 8
doOnNext 9
subscribe 9
doOnNext 10
subscribe 10
doOnNext 11
subscribe 11
doOnNext 12
subscribe 12
doOnNext 13
subscribe 13
doOnNext 14
subscribe 14
doOnComplete

7. interval(Duration period)和 interval(Duration delay, Duration period)

创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。

Flux.interval(Duration.ofSeconds(4), Duration.ofSeconds(2))
	.doOnNext(i -> {
            System.out.println("doOnNext " + i);
        }).doOnComplete(() -> {
            System.out.println("doOnComplete " + new Date());
        }).subscribe((i) -> {
            System.out.println("subscribe " + i + ", date: " + new Date());
        });
try {
	Thread.sleep(10000);
} catch (InterruptedException e) {
	e.printStackTrace();
}
//执行结果
doOnNext 0
subscribe 0, date: Fri Jun 25 10:17:56 CST 2021
doOnNext 1
subscribe 1, date: Fri Jun 25 10:17:58 CST 2021
doOnNext 2
subscribe 2, date: Fri Jun 25 10:18:00 CST 2021
doOnNext 3
subscribe 3, date: Fri Jun 25 10:18:02 CST 2021

上面实例为什么没有输出doOnComplete, 从第四秒开始,每两秒生产一个元素,等到最后complete时已经到了sleep的十秒时间,主线程main已经推出。

8. intervalMillis(long period)和 intervalMillis(long delay, long period)

与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。

Mono

Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。

1. fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier()

分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。

 Mono.fromCallable(() -> {
            System.out.println("begin callable");
            return "Hello";
        })
	.subscribeOn(Schedulers.elastic())
	.doOnNext((i) -> System.out.println("doOnNext " + i + ", thread :" + Thread.currentThread().getName()))
	.subscribe(System.out::println);
Thread.sleep(10000);
//执行结果
begin callable
doOnNext Hello, thread :elastic-2
Hello
Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
            System.out.println("begin");
            return "hello";
        }))
	.subscribeOn(Schedulers.elastic())
	.doOnNext((i) -> System.out.println("doOnNext " + i + ", thread :" + Thread.currentThread().getName()))
	.subscribe(System.out::println);
Thread.sleep(10000);
//执行结果
begin
doOnNext hello, thread :elastic-2
hello

2. delay(Duration duration)和 delayMillis(long duration)

创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。

Mono.delay(Duration.ofSeconds(1)).subscribe(System.out::println);
Thread.sleep(3000);
//执行结果, 延迟一秒后打印
0

3. ignoreElements(Publisher source)

创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。

Mono.ignoreElements((i) -> {
            System.out.println("ignoreElements");
        })
	.doOnNext((i) -> System.out.println("doOnNext " + i))
        .subscribe(System.out::println);
//执行结果
ignoreElements

4. justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data)

从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。

Optional<Integer> optional = Optional.empty();
Mono.justOrEmpty(optional)
	.doOnNext((i) -> System.out.println("doOnNext " + i))
	.subscribe(System.out::println);

System.out.println("========");

optional = Optional.of(100);
Mono.justOrEmpty(optional)
	.doOnNext((i) -> System.out.println("doOnNext " + i))
	.subscribe(System.out::println);
//执行结果
========
doOnNext 100
100

操作符

1. buffer 和 bufferTimeout

这两个操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。在进行收集时可以指定不同的条件:所包含的元素的最大数量或收集的时间间隔。方法 buffer()仅使用一个条件,而 bufferTimeout()可以同时指定两个条件。指定时间间隔时可以使用 Duration 对象或毫秒数,即使用 bufferMillis()或 bufferTimeoutMillis()两个方法。

除了元素数量和时间间隔之外,还可以通过 bufferUntil 和 bufferWhile 操作符来进行收集。这两个操作符的参数是表示每个集合中的元素所要满足的条件的 Predicate 对象。bufferUntil 会一直收集直到 Predicate 返回为 true。使得 Predicate 返回 true 的那个元素可以选择添加到当前集合或下一个集合中;bufferWhile 则只有当 Predicate 返回 true 时才会收集。一旦值为 false,会立即开始下一次收集。

Flux.range(1, 100).buffer(20).subscribe(System.out::println);

//执行结果
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
[81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]

2. filter

对流中包含的元素进行过滤,只留下满足 Predicate 指定条件的元素。

Flux.range(1, 10)
	.filter(i -> i%2==0)
        .doOnNext(i -> {
		System.out.println("[doOnNext] " + i);
	})
	.subscribe(i -> {
		System.out.println("subscribe " + i);
	});
//执行结果
[doOnNext] 2
subscribe 2
[doOnNext] 4
subscribe 4
[doOnNext] 6
subscribe 6
[doOnNext] 8
subscribe 8
[doOnNext] 10
subscribe 10

3. window

window 操作符的作用类似于 buffer,所不同的是 window 操作符是把当前流中的元素收集到另外的 Flux 序列中,因此返回值类型是 Flux

Flux.range(1, 15).window(5)
	.doOnNext((flux -> {}))
	.subscribe(flux -> {
		flux.doOnNext((item) -> {
			System.out.println("[window] flux: " + item);
		})
	.doOnComplete(() -> System.out.println("flux item complete"))
	.subscribe();
});
// 执行结果
[window] flux: 1
[window] flux: 2
[window] flux: 3
[window] flux: 4
[window] flux: 5
flux item complete
[window] flux: 6
[window] flux: 7
[window] flux: 8
[window] flux: 9
[window] flux: 10
flux item complete
[window] flux: 11
[window] flux: 12
[window] flux: 13
[window] flux: 14
[window] flux: 15
flux item complete

4. zipWith

zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。在合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流;也可以通过一个 BiFunction 函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。

Flux.just("Hello", "Project")
	.zipWith(Flux.just("World", "Reactor"))
	.subscribe(System.out::println);

System.out.println("======");

Flux.just("Hello", "Project")
	.zipWith(Flux.just("World", "Reactor"), (s1, s2) -> String.format("%s!%s!", s1, s2))
	.subscribe(System.out::println);
// 执行结果
Hello,World
Project,Reactor
======
Hello!World!
Project!Reactor!

5. take

take 系列操作符用来从当前流中提取元素。提取的方式可以有很多种。

take(long n),take(Duration timespan)和 takeMillis(long timespan):按照指定的数量或时间间隔来提取。

Flux.range(1, 10).take(2).subscribe(System.out::println);
// 执行结果
1
2
  1. takeLast(long n):提取流中的最后 N 个元素。
Flux.range(1, 10).takeLast(2).subscribe(System.out::println);
// 执行结果
9
10
  1. takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 返回 true。
Flux.range(1, 10).takeUntil(i -> i == 6).subscribe(System.out::println);
// 执行结果
1
2
3
4
5
6
  1. takeWhile(Predicate<? super T> continuePredicate): 当 Predicate 返回 true 时才进行提取。
Flux.range(1, 10).takeWhile(i -> i < 5).subscribe(System.out::println);
// 执行结果
1
2
3
4
  1. takeUntilOther(Publisher<?> other):提取元素直到另外一个流开始产生元素。
Flux.range(1, 5).takeUntilOther((i) -> {
	try {
		Thread.sleep(1000);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}).subscribe(System.out::println);
// 执行结果,暂停1000ms后开始输出
1
2
3
4
5

6. reduce 和 reduceWith

reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。累积操作是通过一个 BiFunction 来表示的。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。

Flux.range(1, 10)
	.reduce((x, y) -> {
		System.out.println("x:" + x + ", y:" + y);
		return x+y;
	})
	.subscribe(System.out::println);
// 执行结果
x:1, y:2
x:3, y:3
x:6, y:4
x:10, y:5
x:15, y:6
x:21, y:7
x:28, y:8
x:36, y:9
x:45, y:10
55
Flux.range(1, 10)
	.reduceWith(() -> 100, (x, y) -> {
	 System.out.println("x:" + x + ", y:" + y);
                    return x+y;
                })
                .subscribe(System.out::println);
// 执行结果
x:100, y:1
x:101, y:2
x:103, y:3
x:106, y:4
x:110, y:5
x:115, y:6
x:121, y:7
x:128, y:8
x:136, y:9
x:145, y:10
155

7. merge 和 mergeSequential

merge 和 mergeSequential 操作符用来把多个流合并成一个 Flux 序列。不同之处在于 merge 按照所有流中元素的实际产生顺序来合并,而 mergeSequential 则按照所有流被订阅的顺序,以流为单位进行合并。

Flux.merge(Flux.interval(
	Duration.of(0, ChronoUnit.MILLIS),
	Duration.of(100, ChronoUnit.MILLIS)).take(2),
	Flux.interval(
		Duration.of(50, ChronoUnit.MILLIS),
		Duration.of(100, ChronoUnit.MILLIS)).take(2))
	.toStream()
	.forEach(System.out::println);
System.out.println("==============");
Flux.mergeSequential(Flux.interval(
	Duration.of(0, ChronoUnit.MILLIS),
	Duration.of(100, ChronoUnit.MILLIS)).take(2),
	Flux.interval(
		Duration.of(50, ChronoUnit.MILLIS),
		Duration.of(100, ChronoUnit.MILLIS)).take(2))
	.toStream()
                .forEach(System.out::println);
// 执行结果
0
0
1
1
==============
0
1
0
1

8. flatMap 和 flatMapSequential

flatMap 和 flatMapSequential 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。flatMapSequential 和 flatMap 之间的区别与 mergeSequential 和 merge 之间的区别是一样的。

Flux.just(1, 2)
	.flatMap(x -> Flux.interval(Duration.of(x * 10, ChronoUnit.MILLIS), Duration.of(10, ChronoUnit.MILLIS)).take(x))
	.toStream()
	.forEach(System.out::println);
// 执行结果
0
0
1

9. concatMap 和 combineLatest

concatMap 操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。与 flatMap 不同的是,concatMap 会根据原始流中的元素顺序依次把转换之后的流进行合并;与 flatMapSequential 不同的是,concatMap 对转换之后的流的订阅是动态进行的,而 flatMapSequential 在合并之前就已经订阅了所有的流。

Flux.just(5, 10)
	.concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
	.toStream()
	.forEach(System.out::println);

Flux.combineLatest(
	Arrays::toString,
	Flux.intervalMillis(100).take(5),
	Flux.intervalMillis(50, 100).take(5)
).toStream().forEach(System.out::println);

四. 结束

上文已经简单介绍了Reactor的两个核心概念Flux和Mono,以及一些常用操作符的使用,刚开始使用响应式编程范式对于部分开发人员来说可能极度困难,但熟能生巧,长期使用让思维方式转变才能领会到响应式编程的优点。

Tags: