代码之家  ›  专栏  ›  技术社区  ›  Mulgard

如何在subscribe中用阻塞操作包装通量?

  •  1
  • Mulgard  · 技术社区  · 6 年前

    在文档中,您应该将阻塞代码包装到 Mono : http://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking

    但它并没有写下如何真正做到这一点。

    我有以下代码:

    @PostMapping(path = "some-path", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Mono<Void> doeSomething(@Valid @RequestBody Flux<Something> something) {
        something.subscribe(something -> {
            // some blocking operation
        });
    
        // how to return Mono<Void> here?
    }
    

    如果我还一个 Mono.empty

    第二个问题是:如何像文档中建议的那样包装阻塞代码:

    Mono blockingWrapper = Mono.fromCallable(() -> { 
        return /* make a remote synchronous call */ 
    });
    blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic()); 
    
    1 回复  |  直到 6 年前
        1
  •  4
  •   Brian Clozel    6 年前

    你不应该打电话 subscribe 在控制器处理程序中,但只需构建一个反应管道并返回它。最终,HTTP客户机将请求数据(通过springwebflux引擎),这就是向管道订阅和请求数据的内容。

    手动订阅将使请求处理与另一个操作分离,这将1)取消对操作顺序的任何保证,2)如果另一个操作使用HTTP资源(如请求主体),则中断处理。

    在这种情况下,源不是阻塞的,而是只执行转换操作。所以我们最好用 publishOn 表示链的其余部分应该在特定的调度器上执行。如果这里的操作是I/O绑定的,那么 Scheduler.elastic() 是最好的选择,如果它的CPU限制那么 Scheduler.paralell 这样更好。举个例子:

    @PostMapping(path = "/some-path", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Mono<Void> doSomething(@Valid @RequestBody Flux<Something> something) {
    
        return something.collectList()
          .publishOn(Scheduler.elastic())
          .map(things -> { 
             return processThings(things);
          })
          .then();        
    }
    
    public ProcessingResult processThings(List<Something> things) {
      //...
    }
    

    有关该主题的更多信息,请查看 Scheduler section in the reactor docs