在我的JavaSpring引导应用程序中,我使用反应式库来连接并从websocket进行流式传输。我的中有以下依赖项
pom.xml
文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.8</version>
<scope>test</scope>
</dependency>
我打开websocket连接的生产代码如下
@Autowired private ReactorNettyWebSocketClient client;
public <T> Flux<T> connectToWebSocketAndParseMessages(
String websocketUrl, Function<String, T> deserializer) {
return Flux.create(
sink -> {
try {
client
.execute(new URI(websocketUrl), getWebSocketHandler(deserializer, sink))
.retryWhen(getRetrySpec())
.subscribe(); // Subscribe to start the WebSocket connection
} catch (URISyntaxException e) {
sink.error(e);
}
});
}
@NotNull
private <T> WebSocketHandler getWebSocketHandler(
Function<String, T> deserializer, FluxSink<T> sink) {
return session -> {
Flux<WebSocketMessage> messageFlux = session.receive();
return messageFlux
.map(
message -> {
String messagePayload = message.getPayloadAsText();
try {
T model = deserializer.apply(messagePayload);
sink.next(model); // Emit the parsed model
} catch (Exception e) {
sink.error(e); // Handle parsing errors by signaling an error
}
return messagePayload;
})
.then();
};
}
现在,我正在尝试对该方法进行单元测试
connectToWebSocketAndParseMessages
为此,我嘲笑
ReactorNettyWebSocketClient
以及客户端创建的会话。我的测试用例看起来像
@SpringBootTest
@TestPropertySource(locations = "classpath:test.properties")
class WebSocketGatewayTest {
@Autowired private WebSocketGateway webSocketGateway;
@MockBean private ReactorNettyWebSocketClient mockClient;
@Test
public void testConnectToWebSocketAndParseMessages1() {
// Given
Function<String, Integer> deserializer = Integer::parseInt;
String websocketUrl = "ws://example.com";
WebSocketSession session = mock(WebSocketSession.class);
WebSocketMessage message1 = mock(WebSocketMessage.class);
WebSocketMessage message2 = mock(WebSocketMessage.class);
when(mockClient.execute(any(URI.class), any()))
.thenReturn(Flux.just(session).then())
.thenAnswer(
invocation -> {
WebSocketHandler handler = invocation.getArgument(1);
handler.handle(session); // Simulate WebSocket session handling
return Mono.empty();
});
when(session.receive()).thenReturn(Flux.just(message1, message2));
when(message1.getPayloadAsText()).thenReturn("42");
when(message2.getPayloadAsText()).thenReturn("100");
// When
Flux<Integer> result =
webSocketGateway.connectToWebSocketAndParseMessages(websocketUrl, deserializer);
// Then
StepVerifier.create(result).expectNext(42).expectNext(100).expectComplete().verify();
}
// remaining tests
现在,一旦测试运行,它就不会完成,并继续等待从websocket接收消息。模拟
when(mockClient.execute(any(URI.class), any()))
不知何故,没有向websocket发送任何消息,这导致测试无法完成。
这里关于如何修改此测试以测试反应式websocket连接的行为的任何想法。谢谢