代码之家  ›  专栏  ›  技术社区  ›  Mulgard

如何用mono映射通量?

  •  1
  • Mulgard  · 技术社区  · 6 年前

    我有两个请求:

    Flux<ProductID> getProductIds() {
        return this.webClient.get()
                .uri(PRODUCT_ID_URI)
                .accept(MediaType.APPLICATION_STREAM_JSON)
                .retrieve()
                .bodyToFlux(ProductID.class);
    }
    
    Mono<Product> getProduct(String id) {
        return this.itemServiceWebClient.get()
                .uri(uriBuilder -> uriBuilder.path(PRODUCT_URI + "/{id}")
                        .build(id))
                .accept(MediaType.APPLICATION_STREAM_JSON)
                .exchange()
                .flatMap(clientResponse -> clientResponse.bodyToMono(Product.class));
    }
    

    Flux<Product> getProducts() {
        return Flux.create(sink -> this.gateway.getProductIds()
                .doOnComplete(() -> {
                    log.info("PRODUCTS COMPLETE");
    
                    sink.complete();
                })
                .flatMap(productId -> this.getProduct(productId.getID()))
                .subscribe(product -> {
                    log.info("NEW PRODUCT: " + product);
    
                    sink.next(product);
                }));
    }
    

    当我调用此函数时,会得到以下输出:

    PRODUCTS COMPLETE
    NEW PRODUCT: ...
    NEW PRODUCT: ...
    ....
    

    当然,流在结果出现之前就关闭了,因为异步mono映射。如何保持这种非阻塞状态,同时确保在调用on complete之前得到结果?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Brian Clozel    6 年前

    假设 getProducts 是一个控制器方法,如果要将这些产品添加到要在视图模板中呈现的模型中,可以这样解决此问题:

    @GetMapping("/products")
    public String getProducts(Model model) {
    
        Flux<Product> products = this.gateway.getProductIds()
                .flatMap(productId -> this.getProduct(productId.getID()));
        model.put("products", products);
        // return the view name
        return "showProducts";
    }