代码之家  ›  专栏  ›  技术社区  ›  Chaitanya

无法将Flux消息发送到模拟ReactorNettyWebSocketClient

  •  0
  • Chaitanya  · 技术社区  · 1 年前

    在我的JavaSpring引导应用程序中,我使用反应式库来连接并从websocket进行流式传输。我的中有以下依赖项 pom.xml 文件

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-webflux</artifactId>
                <version>3.1.4</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
                <version>3.1.4</version>
            </dependency>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-test</artifactId>
                <version>3.4.8</version>
                <scope>test</scope>
            </dependency>
    

    我打开websocket连接的生产代码如下

    @Autowired private ReactorNettyWebSocketClient client;
    
     public <T> Flux<T> connectToWebSocketAndParseMessages(
          String websocketUrl, Function<String, T> deserializer) {
        return Flux.create(
            sink -> {
              try {
                client
                    .execute(new URI(websocketUrl), getWebSocketHandler(deserializer, sink))
                    .retryWhen(getRetrySpec())
                    .subscribe(); // Subscribe to start the WebSocket connection
              } catch (URISyntaxException e) {
                sink.error(e);
              }
            });
      }
    
      @NotNull
      private <T> WebSocketHandler getWebSocketHandler(
          Function<String, T> deserializer, FluxSink<T> sink) {
        return session -> {
          Flux<WebSocketMessage> messageFlux = session.receive();
          return messageFlux
              .map(
                  message -> {
                    String messagePayload = message.getPayloadAsText();
                    try {
                      T model = deserializer.apply(messagePayload);
                      sink.next(model); // Emit the parsed model
                    } catch (Exception e) {
                      sink.error(e); // Handle parsing errors by signaling an error
                    }
                    return messagePayload;
                  })
              .then();
        };
      }
    

    现在,我正在尝试对该方法进行单元测试 connectToWebSocketAndParseMessages 为此,我嘲笑 ReactorNettyWebSocketClient 以及客户端创建的会话。我的测试用例看起来像

    @SpringBootTest
    @TestPropertySource(locations = "classpath:test.properties")
    class WebSocketGatewayTest {
    
      @Autowired private WebSocketGateway webSocketGateway;
    
      @MockBean private ReactorNettyWebSocketClient mockClient;
    
      @Test
      public void testConnectToWebSocketAndParseMessages1() {
        // Given
        Function<String, Integer> deserializer = Integer::parseInt;
        String websocketUrl = "ws://example.com";
        WebSocketSession session = mock(WebSocketSession.class);
        WebSocketMessage message1 = mock(WebSocketMessage.class);
        WebSocketMessage message2 = mock(WebSocketMessage.class);
    
        when(mockClient.execute(any(URI.class), any()))
            .thenReturn(Flux.just(session).then())
            .thenAnswer(
                invocation -> {
                  WebSocketHandler handler = invocation.getArgument(1);
                  handler.handle(session); // Simulate WebSocket session handling
                  return Mono.empty();
                });
    
        when(session.receive()).thenReturn(Flux.just(message1, message2));
        when(message1.getPayloadAsText()).thenReturn("42");
        when(message2.getPayloadAsText()).thenReturn("100");
    
        // When
        Flux<Integer> result =
            webSocketGateway.connectToWebSocketAndParseMessages(websocketUrl, deserializer);
    
        // Then
        StepVerifier.create(result).expectNext(42).expectNext(100).expectComplete().verify();
      }
    // remaining tests
    

    现在,一旦测试运行,它就不会完成,并继续等待从websocket接收消息。模拟 when(mockClient.execute(any(URI.class), any())) 不知何故,没有向websocket发送任何消息,这导致测试无法完成。

    这里关于如何修改此测试以测试反应式websocket连接的行为的任何想法。谢谢

    0 回复  |  直到 1 年前
    推荐文章