响应式流
Reactor是由Pivotal公司开发的开源框架,它是开发响应式应用的基础。如今,它是建立在由Netflix、Pivotal和Lightbend的工程师以及其他大型JAVA玩家(Oracle和Red Hat)提出的联合倡议(reactive streams initiative)的基础上。
在这个倡议中,响应式流的规范被创建,有以下几个关键因素:
- 响应式流应该是非阻塞的,
- 它是一种数据流,
- 它是异步工作的,
- 它需要能够处理背压。
制定以上标准的原因是:当我们通常写应用程序的时候,我们会进行数据库调用、HTTP调用,我们发起请求,阻塞线程,直到有响应返回,然后继续。虽然这种方式可以工作,但是这是一种资源浪费。
发布者Publisher
在开发过程中,不再返回简单的POJO对象,而必须返回其他内容,在结果可用的时候返回。在响应式流的规范中,被称为发布者(Publisher)。发布者有一个subcribe()
方法,该方法允许使用者在POJO可用时获取它。发布者可以通过以下两种形式返回结果:
- Mono返回0个或1个结果,
- Flux返回0个或多个结果,可能是无限个。
Mono
Mono是Publisher的一种,返回0或者1个结果,也可以返回一个Optional。
举个例子,我们有如下代码:
public Person findCurrentUser() {
if (isAuthenticated()) return new Person("Jane", "Doe");
else return null;
}
在Java8中,我们可以这样写:
public Optional<Person> findCurrentUser() {
if (isAuthenticated()) return Optional.of(new Person("Jane", "Doe"));
else return Optional.empty();
}
如果我们使用响应式流,可以写成这样:
public Mono<Person> findCurrentUser() {
if (isAuthenticated()) return Mono.just(new Person("Jane", "Doe"));
else return Mono.empty();
}
Flux
Flux也是Publisher的一种,返回0或者多个结果,甚至可以返回无数个结果。通常将其用作,集合collection、数组array、或者流stream的响应式计数方式。
举个例子,我们有如下代码:
public List<Person> findAll() {
return Arrays.asList(
new Person("Jane", "Doe"),
new Person("John", "Doe")
);
}
在Java8中,我们可以用Stream来实现:
public Stream<Person> findAll() {
return Stream.of(
new Person("Jane", "Doe"),
new Person("John", "Doe")
);
}
如果我们使用Flux,可以这样实现:
public Flux<Person> findAll() {
return Flux.just(
new Person("Jane", "Doe"),
new Person("John", "Doe")
);
}
从上面可以看到,响应式编程,与函数式编程很类似。
订阅者Subscriber
以下代码会在控制台输出什么呢?
Flux
.just(1, 2, 3, 4)
.reduce(Integer::sum)
.log();
答案是什么都不会输出,响应式流使用push模型,每一项都按照发布者的速度推送到流上,而不管订阅者是否能够跟随。但是也不用担心,因为有背压(back pressure)的存在,可以保证正确。
上述代码通常会认为输出10,但是其实不是的。因为响应流失延迟的,也可以称为懒惰的,只要没有订阅者就不会启动。所以,订阅者也是必不可少的一部分。
这个跟JAVA8中的惰性求值、及早求值概念类似。
异步特性
发布者在本质上是异步的,然后并非总是异步的,是否异步,取决于发布者的类型。看如下代码:
AtomicInteger sum = new AtomicInteger(0);
Flux
.just(1, 2, 3, 4)
.reduce(Integer::sum)
.subscribe(sum::set);
log.info("Sum is: {}", sum.get());
不同的人可能有不同的答案:1、要么输出10,因为对数字求和;2、要么输出0,因为是异步操作,在执行log的时候,还未进行sum求和。
正确的答案是第一种:输出10。因为Flux.just()默认情况下使用当前线程,因此程序在执行到达日志语句时已经计算出了结果。
那下面这个代码会输出什么吗?
AtomicInteger sum = new AtomicInteger(0);
Flux
.just(1, 2, 3, 4)
.subscribeOn(Schedulers.elastic())
.reduce(Integer::sum)
.subscribe(sum::set);
logger.info("Sum is: {}", sum.get());
将会输出0,因为在这里使用了subscribeOn
方法,将使得订阅者在异步线程执行。因此,根据响应流的性质,它可以是同步的,也可以是异步的。上述代码可以将logger打印采用lambda实现:
Flux
.just(1, 2, 3, 4)
.reduce(Integer::sum)
.susbcribe(sum -> logger.info("Sum is: {}", sum);
构建自己的流
Project Reactor
提供了很多内置的发布者。然而,在某些情况下,我们必须创建自己的Publisher
。Mono和Flux都提供了create()
方法来构建自定义的流。
例如,我们想使用响应流的Twitter4J库,可以写成这样:
return Flux.create(sink -> {
TwitterStream twitterStream = new TwitterStreamFactory(configuration).getInstance();
twitterStream.onStatus(sink::next);
twitterStream.onException(sink::error);
sink.onCancel(twitterStream::shutdown);
});
这也是一个无限流的例子,因为推文的数量将永远持续下去(或者直到Twitter关闭)。
热流和冷流
热流称为Host Stream,冷流称为Cold Stream。两者的区别在于:当我们多个订阅者使用冷观察的时候,流将重新启动。热观察的时候,流将复用。默认情况下,流是冷流。
如下方代码,默认采用的是冷流:
Flux<Integer> numbers = Flux
.just(1, 2, 3, 4)
.log();
numbers
.reduce(Integer::sum)
.subscribe(sum -> logger.info("Sum is: {}", sum));
numbers
.reduce((a, b) -> a * b)
.subscribe(product -> logger.info("Product is: {}", product));
上面的例子中,1到4被发布两次,一次针对第一个订阅者,一次针对第二个订阅者。
但是在有些情况下,我们不希望流重头开始。例如HTTP Request
,在这种情况下,我们可以使用热流。在Project Reactor
中,我们可以使用share()
方法(针对Flux)或者cache()
方法(针对Mono),代码如下所示:
Flux<Integer> numbers = Flux
.just(1, 2, 3, 4)
.log()
.share();
numbers
.reduce(Integer::sum)
.subscribe(sum -> logger.info("Sum is: {}", sum));
numbers
.reduce((a, b) -> a * b)
.subscribe(product -> logger.info("Product is: {}", product));
通过share()
方法,1到4只发布一次,被两个订阅者共享。
Comments | NOTHING