# 响应式

JetLinks使用Project Reactor作为响应式编程框架,从网络层(webfluxvert.x)到持久层(r2dbcelastic)全部 封装为非阻塞响应式调用。

响应式可以理解为观察者模式,通过订阅发布数据流中的数据对数据进行处理。 Project Reactor提供了强大的API,简化多线程和异步编程开发,降低了对数据各种处理方式的复杂度。如果你已经大量使用了java8 stream api,使用reactor将很容易上手。

说明

响应式传统编程最大的区别是:

响应式中的方法调用是在构造一个流以及处理流中数据的逻辑。当中产生了数据(发布、订阅),才会执行构造好的逻辑。

传统编程则是直接执行逻辑获取结果。

# 优点

非阻塞,大大简化多线程异步编程。 集成netty等框架可实现更高的网络并发处理能力。 API丰富,实现很多复杂的功能只需要几行代码。例如:

  1. 前端展示实时数据处理进度。
  2. 请求撤销,可获取到连接断开事件。
  3. 定时(interval),延迟(delay),超时(timout),以及细粒度的流量控制(limitRate)。
  4. 分组(groupBy),聚合(collectreduce)操作等。

# 缺点

调试不易,异常栈难跟踪,对开发人员有更高的要求。

此问题可以通过优化代码结构来解决。比如:避免在响应式操作符中直接业务逻辑, 正确的做法是将业务逻辑抽离为独立的函数(方法),然后使用响应式来进行组合。

警告

响应式只是一个编程模型,并不能直接提高系统的并发处理能力。 通常与netty(reactor-netty)等框架配合,从上(网络)到下(持久化)全套实现非阻塞响应式才有意义。

# 选择合适的操作符

系统中大量使用到了reactor,其核心类只有2个:Flux(0-n个数据的流),Mono(0-1个数据的流)。 摒弃传统编程的思想,熟悉FluxMono操作符(API),就可以很好的使用响应式编程了。

常用操作符:

  1. map: 转换流中的元素: flux.map(UserEntity::getId)
  2. mapNotNull: 转换流中的元素,并忽略null值.(reactor 3.4提供)
  3. flatMap: 转换流中的元素为新的流: flux.flatMap(this::findById)
  4. flatMapMany: 转换Mono中的元素为Flux(1个转多个): mono.flatMapMany(this::findChildren)
  5. concat: 将多个流连接在一起组成一个流(按顺序订阅) : Flux.concat(header,body)
  6. merge: 将多个流合并在一起,同时订阅流: Flux.merge(save(info),saveDetail(detail))
  7. zip: 压缩多个流中的元素: Mono.zip(getData(id),getDetail(id),UserInfo::of)
  8. then: 上游流完成后执行其他的操作.
  9. doOnNext: 流中产生数据时执行.
  10. doOnError: 发送错误时执行.
  11. doOnCancel: 流被取消时执行.如: http未响应前,客户端断开了连接.
  12. onErrorContinue: 流发生错误时,继续处理数据而不是终止整个流.
  13. defaultIfEmpty: 当流为空时,使用默认值.
  14. switchIfEmpty: 当流为空时,切换为另外一个流.
  15. as: 将流作为参数,转为另外一个结果:flux.as(this::save)

完整文档请查看官方文档

# 代码格式化

使用reactor时,应该注意代码尽量以.换行并做好相应到缩进。例如:


//错误
return paramMono.map(param->param.getString("id")).flatMap(this::findById);

//建议
return paramMono
            .map(param->param.getString("id")) 
            .flatMap(this::findById);

# lamdba

避免在一个lambda中编写大量的逻辑代码。推荐参考领域模型,将具体当逻辑放到对应到实体或者领域对象中。例如:


//错误
return devicePropertyMono
        .map(prop->{
            Map<String,Object> map = new HashMap<>();
            map.put("property",prop.getProperty());
            ....
            return map;
        })
        .flatMap(this::doSomeThing)

//建议
//在DeviceProperty中编写toMap方法实现上面lambda中到逻辑.
return devicePropertyMono
        .map(DeviceProperty::toMap)
        .flatMap(this::doSomeThing)

# null处理

数据流中到元素不允许为null,因此在进行数据转换的时候要注意null处理。例如:


//存在缺陷
return this.findById(id)
           .map(UserEntity::getDescription); //getDescription可能返回null,为null时会抛出空指针,



说明

reactor 3.4后可以使用以下方式来处理可能存在null的map操作:

return this.findById(id)
           .mapNotNull(UserEntity::getDescription); 

# 非阻塞与阻塞

默认情况下,reactor的调度器由数据的生产者(Publisher)决定。在WebFlux中则是netty的工作线程。 为了防止工作线程被阻塞导致服务崩溃,在一个请求的流中,禁止执行存在阻塞(如执行JDBC)可能的操作的。如果无法避免阻塞操作,应该指定调度器如:

paramMono
  .publishOn(Schedulers.elastic()) //指定调度器去执行下面的操作
  .map(param-> jdbcService.select(param))

# 上下文

在响应式中,大部分情况是禁止使用ThreadLocal的(可能造成内存泄漏)。因此基于ThreadLocal的功能都无法使用。reactor中引入了上下文,在一个流中,可共享此上下文 。通过上下文进行变量共享,例如事务,权限等功能。例如:


//从上下文中获取
@GetMapping
public Mono<UserInfo> getCurrentUser(){
    return Mono.subscriberContext()
            .map(ctx->userService
                .findById(ctx
                    .getOrEmpty("userId")
                    .orElseThrow(IllegalArgumentException::new));
}

//定义过滤器设置数据到上下文中
class MyFilter implements WebFilter{
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain){
        return chain.filter(exchange)
            .subscriberContext(Context.of("userId",...))
    }
}

警告

在开发中应该将多个流组合为一个流,而不是分别处理.例如:

//错误
return flux.doOnNext(data->this.save(data).subscribe());

//正确
return flux.flatMap(this::save);

//错误,没有将流组合在一起
request.flatMap(this::save);
Mono<Void> result = this.notifySaveSuccess();
return result;

//正确
return request
    .flatMap(this::save)
    .then(this.notifySaveSuccess());

# FAQ: 我写的操作看上去是正确的,但是没有执行.

有3种可能:上游流为空多个流未组合在一起在不支持响应式的地方使用了响应式

# 1. 上游流为空

例:






 
 





public Mono<Response> handleRequest(Request request){
   
 return this
      .findOldData(request)
      .flatMap(old -> {
            //这里为什么不执行?
            return ....
      })
 
}

说明

findOldData返回的流为空时,下游的flatMap等需要操作流中元素的操作符是不会执行的。 可以通过switchIfEmpty操作符来处理空流的情况。 例如:

 return this
      .findOldData(request)
      //处理没获取到数据的情况
      .switchIfEmpty(Mono.error(()->new NotFoundException("error.data_not_found")))
      .flatMap(old -> {
            return ....
      })

如果flatMapswitchIfEmpty中的逻辑都没执行,那可能是下面一种情况.

# 2. 多个流未组合在一起

例:


public Result handleRequest(Request request){
 
 //处理请求
 service.handleRequest(request);

 return ok;
 
}

说明

  1. 只要方法返回值是Mono或者Flux,都不能单独行动。
  2. 只要方法中调用了任何响应式操作,那这个方法也应该是响应式。(返回Mono或者Flux)

因此正确的写法是:

public Mono<Result> handleRequest(Request request){
 return service
         //处理请求
         .handleRequest(request)
         //记录日志
         .then(saveLog(request))
         //返回结果
         .thenReturn(ok);
}

# 3. 在不支持响应式的地方使用响应式






 
 




public Mono<Result> handleRequest(Request request){

return service
         //处理请求
         .handleRequest(request)
         //记录日志 此为错误的用法
         .doOnNext(response-> saveLog(request,response) )
         //返回结果
         .thenReturn(ok);
}

说明

doOnNext方法的语义以及参数Consumer<T>可知,此方法是不支持响应式的(Consumer<T>只有参数没有返回值)。因此不能在此方法中使用响应式操作。

正确的写法:

return service
         //处理请求
         .handleRequest(request)
         //记录日志 此为错误的用法
         .flatMap(response-> saveLog(request,response) )
         //返回结果
         .thenReturn(ok);

# FAQ: 我想获取流中的元素怎么办

不要试图从流中获取数据出来,而是先思考需要对流中元素做什么。 需要对流中的数据进行操作时,都应该使用操作符来处理,根据Flux或者Mono提供的操作符API进行组合操作。比如:

传统:


public List<Book> getAllBooks(){
    List<BookEntity> bookEntities = repository.findAll();

    List<Book> books = new ArrayList(bookEntities.size());

    for(BookEntity entity : bookEntities){
        Book book = entity.copyTo(new Book());
        books.add(book);
    }

    return books;
}

响应式:

public Flux<Book> getAllBooks(){
return repository
        .findAll()
        .map(entity-> entity.copyTo(new Book()))
}

# FAQ: 我需要在非响应式方法中使用响应式怎么办

如果需要阻塞获取结果,可以使用flux.block(timeout)

如果需要异步获取结果,可以使用flux.subscribe(data->{ },error->{ })

如:

public void handleRequest(Request request){

  //logService.saveLog(request).block()
    
  logService
    .saveLog(request)
    .subscribe(
        result->{
            log.debug("保存成功 {}",request)
        },
        error->{
            log.warn("保存失败 {}",request,error);
        }
    )
}

# 相关资料

  1. reactive-streams
  2. project-reactor
  3. 使用 Reactor 进行反应式编程