代码之家  ›  专栏  ›  技术社区  ›  Bojan Vukasovic

Flux-带webclient的并行flatMap-固定批处理速率限制

  •  0
  • Bojan Vukasovic  · 技术社区  · 4 年前

    我的代码是:

    return Flux.fromIterable(new Generator()).log()
            .flatMap(
                s ->
                    webClient
                        .head()
                        .uri(
                            MessageFormat.format(
                                "/my-{2,number,#00}.xml",
                                channel, timestamp, s))
                        .exchangeToMono(r -> Mono.just(r.statusCode()))
                        .filter(HttpStatus::is2xxSuccessful)
                        .map(r -> s),
                6)  //only request 6 segments in parallel via webClient
            .take(6) //we need only 6 200 OK responses
            .sort();
    

    它只是要求 HEAD ,直到前6个请求成功。

    并行化在这里工作,但问题是,在其中一个请求完成后,它会立即触发下一个请求(以保持并行化级别为6)。这里我需要的是并行化级别为6,但要分批进行。所以-触发6个请求,等待全部完成,再次触发6个请求。。。

    这是 log() 以上内容:

    : | request(6)
    : | onNext(7)
    : | onNext(17)
    : | onNext(27)
    : | onNext(37)
    : | onNext(47)
    : | onNext(57)
    : | request(1) <---- from here NOT OK; wait until all complete and request(6)
    : | onNext(8)
    : | request(1)
    : | onNext(18)
    : | request(1)
    : | onNext(28)
    : | request(1)
    : | onNext(38)
    : | request(1)
    : | onNext(48)
    : | request(1)
    : | onNext(58)
    : | cancel()
    

    更新

    这是我用缓冲区尝试的:

    return Flux.fromIterable(new Generator())
            .buffer(6)
            .flatMap(Flux::fromIterable)
            .log()
            .flatMap(
                s ->
                    webClient
                        .head()
                        .uri(
                            MessageFormat.format(
                                "/my-{2,number,#00}.xml",
                                channel, timestamp, s))
                        .exchangeToMono(r -> Mono.just(r.statusCode()))
                        .filter(HttpStatus::is2xxSuccessful)
                        .map(r -> s),
                6)  //only request 6 segments in parallel via webClient
            .take(6)
            .sort();
    
    0 回复  |  直到 4 年前
        1
  •  1
  •   Bojan Vukasovic    4 年前

    好的,看来我有了有效的代码。我在这里使用 window :

    return Flux.fromIterable(new Generator())
            .window(6) //group 1,2,3,4,5,6,7... into [0,1,2,3,4,5],[6,7..,11],[12,..,17]
            .log()
            .flatMap(
                s -> s.log().flatMap(x -> webClient
                    .head()
                    .uri(
                        MessageFormat.format(
                            "/my-{2,number,#00}.xml",
                            channel, timestamp, x))
                    .exchangeToMono(r -> Mono.just(r.statusCode()))
                    .filter(HttpStatus::is2xxSuccessful)
                    .map(r -> x), 6), 1)  //1 means take only 1 array ([0,1,2,3,4,5]). 6 means take in parallel all from array (0,1,2,3,4,5)
            .take(6, true) //pass through only 6 elements (cancel afterwards)
            .sort();