我有两个请求:
Flux<ProductID> getProductIds() { return this.webClient.get() .uri(PRODUCT_ID_URI) .accept(MediaType.APPLICATION_STREAM_JSON) .retrieve() .bodyToFlux(ProductID.class); } Mono<Product> getProduct(String id) { return this.itemServiceWebClient.get() .uri(uriBuilder -> uriBuilder.path(PRODUCT_URI + "/{id}") .build(id)) .accept(MediaType.APPLICATION_STREAM_JSON) .exchange() .flatMap(clientResponse -> clientResponse.bodyToMono(Product.class)); }
Flux<Product> getProducts() { return Flux.create(sink -> this.gateway.getProductIds() .doOnComplete(() -> { log.info("PRODUCTS COMPLETE"); sink.complete(); }) .flatMap(productId -> this.getProduct(productId.getID())) .subscribe(product -> { log.info("NEW PRODUCT: " + product); sink.next(product); })); }
当我调用此函数时,会得到以下输出:
PRODUCTS COMPLETE NEW PRODUCT: ... NEW PRODUCT: ... ....
当然,流在结果出现之前就关闭了,因为异步mono映射。如何保持这种非阻塞状态,同时确保在调用on complete之前得到结果?
假设 getProducts 是一个控制器方法,如果要将这些产品添加到要在视图模板中呈现的模型中,可以这样解决此问题:
getProducts
@GetMapping("/products") public String getProducts(Model model) { Flux<Product> products = this.gateway.getProductIds() .flatMap(productId -> this.getProduct(productId.getID())); model.put("products", products); // return the view name return "showProducts"; }