代码之家  ›  专栏  ›  技术社区  ›  ioreskovic

Spring集成流丢失订阅服务器

  •  2
  • ioreskovic  · 技术社区  · 6 年前

    org.springframework.web.reactive.socket.WebSocketMessage ,对其进行一些处理,包括使用Netty的 ByteBuf . 在某个时刻,我的流程中发生异常:

    org.springframework.messaging.MessageHandlingException: error occurred in message handler [_org.springframework.integration.errorLogger.handler]; nested exception is io.netty.util.IllegalReferenceCountException: refCnt: 0
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
        at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
        at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
        at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:93) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    ...
    Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
        at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1417) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
        at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1356) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
        at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:417) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
        at io.netty.buffer.ByteBufUtil.hashCode(ByteBufUtil.java:175) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
        at io.netty.buffer.AbstractByteBuf.hashCode(AbstractByteBuf.java:1315) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
        at org.springframework.core.io.buffer.NettyDataBuffer.hashCode(NettyDataBuffer.java:288) ~[spring-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
        at org.springframework.web.reactive.socket.WebSocketMessage.hashCode(WebSocketMessage.java:134) ~[spring-webflux-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
        at java.lang.Object.toString(Object.java:236) ~[?:1.8.0_161]
        at java.lang.String.valueOf(String.java:2994) ~[?:1.8.0_161]
        at java.lang.StringBuilder.append(StringBuilder.java:131) ~[?:1.8.0_161]

    之后,处理所有二进制web套接字消息失败,出现以下异常:

    2018-11-26T10:38:29,133 ERROR --- [-server-epoll-7] o.s.i.h.LoggingHandler (:) org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'binaryWebSocketMessageChannel'; nested exception is java.lang.IllegalStateException: The [binaryWebSocketMessageChannel] doesn't have subscribers to accept messages, failedMessage=GenericMessage [payload=MyPayload(payload=org.springframework.web.reactive.socket.WebSocketMessage@38552d5, session=ReactorNettyWebSocketSession[id=3e0be929, uri=http://localhost:8080/]), headers={id=b09a89ff-f7be-1b43-6f62-40e5c0b5695a, timestamp=1543225109132}]
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
        at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:183)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
        at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:205)
        at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:55)
        at org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:138)
        at org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:127)
        at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
        at reactor.core.publisher.FluxRetry$RetrySubscriber.onNext(FluxRetry.java:79)
    ...
    Caused by: java.lang.IllegalStateException: The [binaryWebSocketMessageChannel] doesn't have subscribers to accept messages
        at org.springframework.util.Assert.state(Assert.java:94)
        at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
        ... 57 more
    

    任何 尝试解决问题的方向? 同样,在哪些情况下,SI EIP组件(路由器、变压器、过滤器、服务激活器)会从通道取消订阅?

    作为参考,通道类型为 org.springframework.integration.channel.FluxMessageChannel

    编辑:

    我的流程如下所示:

    WebSocketMessage -> router: (BINARY)  -> binaryWebSocketMessageChannel -> ...
                                (!BINARY) -> nullChannel
    

    (我知道过滤器更适合这里,我计划稍后重构)

    https://github.com/ioreskovic/Spring-Integration-flow-loses-subscriber

    1 回复  |  直到 6 年前
        1
  •  2
  •   Artem Bilan    6 年前

    关键是 Publisher FluxMessageChannel

    onErrorContinue() 从反应堆里 3.2 版本 5.1 . 为了解决问题,最好考虑将应用程序升级到最新的Spring引导程序。 2.1.1 .

    当我们解决问题时,你可以考虑吞下一个例外。 BinaryWsmToBytesTransformer 不要把它泡在水里