代码之家  ›  专栏  ›  技术社区  ›  billdoor

Reactor/WebFlux实现一个反应式http新闻自动收录器

  •  3
  • billdoor  · 技术社区  · 7 年前

    我有一个非常简单的请求,但我无法在不泄漏资源的情况下完成它。

    我想返回类型为的响应 application/stream+json ,以某人发布的新闻事件为特色。我不想使用Websockets,不是因为我不喜欢它们,我只是想知道如何使用流。

    为此,我需要返回 Flux<News> 从我的restcontroller,只要有人发布任何消息,就会不断收到消息。

    我的尝试是创建一个发布者:

    public class UpdatePublisher<T> implements Publisher<T> {
    
        private List<Subscriber<? super T>> subscribers = new ArrayList<>();
    
        @Override
        public void subscribe(Subscriber<? super T> s) {
            subscribers.add(s);
        }
    
        public void pushUpdate(T message) {
            subscribers.forEach(s -> s.onNext(message));
        }
    
    }
    

    还有一个简单的新闻对象:

    public class News {
        String message;
        // Constructor, getters, some properties omitted for readability...
    }
    

    和发布新闻的端点分别获取新闻流

    // ...
    
    private UpdatePublisher<String> updatePublisher = new UpdatePublisher<>();
    
    @GetMapping(value = "/news/ticker", produces = "application/stream+json")
    public Flux<News> getUpdateStream() {
         return Flux.from(updatePublisher).map(News::new);
    }
    
    @PutMapping("/news")
    public void putNews(@RequestBody News news) {
        updatePublisher.pushUpdate(news.getMessage());
    }
    

    这是可行的,但我无法取消订阅或再次访问任何给定的订阅-因此,一旦客户端断开连接 updatePublisher 只会继续推进越来越多的死区通道,因为我无法呼叫 onCompleted() 订阅上的处理程序。

    TL;DL:

    可以将消息从不同的线程推送到可能的无止境流量上,并且仍然可以根据需要终止流量,而不依赖于对等异常重置或类似的内容吗?

    1 回复  |  直到 7 年前
        1
  •  0
  •   Brian Clozel    7 年前

    你应该 从不 尝试实现自己的 Publisher 接口,因为它归结为正确实现反应流。这正是你在这里面临的问题。

    相反,您应该使用Reactor本身提供的一个生成器操作符(这实际上是一个Reactor问题,与Spring-WebFlux无关)。

    在这种情况下, Flux.create Flux.push 如果您的代码使用某种类型的事件监听器将事件推送到流中,则可能是最好的候选者。 See the reactor project reference documentation on that

    如果没有更多的细节,很难给出解决问题的具体代码示例。不过,这里有一些建议:

    • 你可能想 .share() 如果您想要一些类似多播的通信模式,那么所有订阅者的事件流
    • 注意你想要的推/拉/推+拉模式;背压在这里应该如何工作?如果我们生成订阅者可以处理的更多事件,会怎么样?
    • 此模型仅适用于单个应用程序实例。如果您希望在多个应用程序实例上使用此功能,那么您可能希望使用代理查看消息传递模式