代码之家  ›  专栏  ›  技术社区  ›  greengold

反应式WebSocketClient数据发布

  •  0
  • greengold  · 技术社区  · 6 年前

    我正在尝试使用从WebSocket读取数据 ReactorNettyWebSocketClient 但我就是无法连接到它的API。问题是,我接收的所有数据都在lambda样式的WebSocketHandler(1)的内部可用,但我希望在.subscribe on client.execute(2)之后将其提供给订阅服务器。

    WebSocketClient client = new ReactorNettyWebSocketClient();
            client.execute(
                    URI.create(URL),
                    session -> session.send(
                            Mono.just(session.textMessage(pairRqStr)))
                            .thenMany(session.receive()
                                    .map(WebSocketMessage::getPayloadAsText)
                                    .map(this::toResp)
                                    .onErrorContinue((throwable, o) -> throwable.getMessage())
                            )
                            .log() // #1
                            .then()
            )
                    .log()
                    .subscribe(System.out::println); // #2
    

    有点迷茫和新的,所以,请引导我。

    0 回复  |  直到 6 年前
        1
  •  0
  •   szurawar    6 年前

    要发送或接收消息,必须首先连接到通道。这就是为什么 client.execute 退货 Mono<Void> 也就是说,它不返回任何数据,只是表示手持电话的完成或失败。 如果它会回来的话。 Flux<WebSocketMessage> ,您如何知道手持设备是否成功完成?

    如果要访问lambda外部的通道,则无法通过实现handler方法来进行访问:

            Consumer<WebSocketMessage> printingConsumer = webSocketMessage -> System.out.println(webSocketMessage.getPayloadAsText());
    
            client.execute(URI.create(URL), session -> handle(session, printingConsumer));
        }
    
    
        public Mono<Void> handle(WebSocketSession session, Consumer<WebSocketMessage> consumer) {
            return session.receive()
                    .doOnNext(consumer::accept)
                    .then();
        }
    
        2
  •  0
  •   greengold    6 年前

    这是我最后做的,但还是不太喜欢:

        private Flux<WebSocketMessage> requestData(String req) {
            WebSocketClient client = new ReactorNettyWebSocketClient();
            return ConnectableFlux.create(sub -> {
                client.execute(
                        URI.create(URL),
                        session -> session.send(
                                Mono.just(session.textMessage(req)))
                                .thenMany(session.receive().doOnNext(sub::next))
                                .then()
                )
                        .log()
                        .subscribe();
            });
        }