代码之家  ›  专栏  ›  技术社区  ›  Venu

将状态对象保持在反应流中/当使用项目反应器Mono或通量时,方法级对象是否线程安全

  •  0
  • Venu  · 技术社区  · 2 年前

    我正在开发一个对各种API调用的反应流序列,并希望维护一个状态对象。有没有一种方法可以使用 https://projectreactor.io/ ? 以下是我所做的和我想完成的

    public Mono<MyDTO> test(AnotherDTO req) {
            MyDTO mDTO1 = new MyDTO(req); // ---> line_1
            // ... some code to update myDTO1 based on req object
            return api_1.get(mDTO1.getName()).flatMap(api_1_Res -> { // ---> 1
                mDTO1.setApi1Response(api_1_Res);
                return Mono.just(mDTO1);
            }).flatMap(monoResp -> {
                // ... some code to get a value from monoResp
                String var = monoResp.getApi1Response().getSomeValue();
                return api_2.get(var).flatMap(api_2_Res -> {
                    monoResp.setApi2Response(api_2_Res);
                    return Mono.just(monoResp);
                });
            }).flatMap(monoResp -> {
                // ... some code to get a value from monoResp
                String var1 = monoResp.getApi1Response().getSomeValue2();
                String var2 = monoResp.getApi2Response().getSomeInt();
                return api_3.get(var1, var2).flatMap(api_3_Res -> {
                    monoResp.setApi3Response(api_3_Res);
                    return Mono.just(monoResp);
                });
            }).flatMap(monoResp -> {
                saveIntoDB(monoResp);
            }).onErrorResume(errRes -> {
                log.error("exception {}", errRes.getMessage());
                return Mono.empty();
            });
        }
    

    问题:

    1. 如何维护状态对象,以便在flatMap操作(1,2,3)中的任何一个发生错误时,myDTO都可以包含状态,直到之前的flatMap动作,并在onErrorResume上获得访问权限?

    老问题: 我可以使用上述方法来保持myDTO的状态吗 如果发生错误,如何获取myDTO的状态,直到出现错误为止?就像在api_2发生错误一样,获取myDTO直到api_1响应,这样我就可以在出现错误时将其保存在DB中。

    0 回复  |  直到 2 年前
        1
  •  0
  •   Igor Artamonov    2 年前

    当您需要在步骤之间传递可变状态时,最好创建一个新实例(即具有一个不可变对象),并在每个步骤上返回一个新副本。这样可以确保在重复或并行执行某些步骤时不会陷入冲突状态。因此,通过每次创建一个新实例,您可以正确地完成所有操作。然而,在您的情况下,这并不是真正必要的,因为您使用简单的Monos进行简单的逐步流程。

    第二个问题,关于错误处理,更为复杂。根据我的经验,使用Reactor很容易犯错误,并忽略一些处理错误的地方。

    总的来说,我建议采取两种方法。首先,你总是可以创建自己的 Exception 类,该类将包装DTO和原始异常:

    class DtoException extends RuntimeException {
         final DTO dto;
         public DtoException(DTO dto, Throwable cause) {
             this.dto = dtop;
             super(cause);
         }
    }
    
    // and then
    
    api.getSomething(dto.getField()).onErrorMap(t -> new DtoException(dto, t))
    

    然后在下游的某个地方用 onErrorResume .

    或者,您可以编写一个函数来保存状态并直接使用它:

    Function<Mono<T>, Mono<T>> saveFailed(dto) {
       return { flow ->
          flow.onErrorResume( t -> { 
              dto.save();
              Mono.error(t)
          }
       }
    }
    
    // and then:
    
    api.getSomething(dto.getField()).transform(saveFailed(dto))
    

    上面的代码可以替换为 Function<Throwable, Mono<DTO>> 并将其直接传递给 onErrorResume ,如果在您的情况下可读性更强的话。