代码之家  ›  专栏  ›  技术社区  ›  Erdem Aydemir

如何处理SSE连接关闭?

  •  0
  • Erdem Aydemir  · 技术社区  · 7 年前

    我有一个和示例代码块一样的端点流式处理。流式处理时,我通过 streamHelper.getStreamSuspendCount() .我正在更改状态中停止此异步方法。但是当浏览器关闭并且会话终止时,我不能访问这个异步方法。我正在更改状态时停止会话范围中的异步方法。但是当浏览器关闭并且会话终止时,我不能访问这个异步方法。会话关闭时如何访问此范围?

    @RequestMapping(value = "/stream/{columnId}/suspendCount", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @ResponseBody
    public Flux<Integer> suspendCount(@PathVariable String columnId) {
        ColumnObject columnObject = streamHelper.findColumnObjectInListById(columnId);
        return streamHelper.getStreamSuspendCount(columnObject);
    }
    
    
    getStreamSuspendCount(ColumnObject columnObject) {
       ...
       //async flux
       Flux<?> newFlux = beSubscribeFlow.get(i);
       Disposable disposable = newFlux.subscribe();
       beDisposeFlow.add(disposable); // my session scope variable. if change state, i will kill disposable (dispose()).
       ...
       return Flux.fromStream(Stream.generate(() -> columnObject.getPendingObject().size())).distinctUntilChanged()
                        .doOnNext(i -> {
                            System.out.println(i);
                        }));
    }
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   Simon Baslé    7 年前

    我认为部分问题是你试图 Disposable 在会话结束时要调用的。但是这样做的时候,你自己就订阅了序列。Spring框架还将订阅 Flux 返回者 getStreamSuspendCount ,并且需要取消订阅,以便SSE客户端得到通知。

    现在如何做到这一点?你需要的是一种“阀门”,一旦接收到外部信号,它就会取消信号源。这就是 takeUntilOther(Publisher<?>) 做。

    所以现在你需要一个 Publisher<?> 您可以将其与会话生命周期(更具体地说是会话关闭事件)联系起来:一旦它发出, takeUntilOther 将取消其源。

    2种选择:

    • 会话关闭事件在类似api:use的侦听器中公开 Mono.create
    • 您确实需要手动触发取消:使用 MonoProcessor.create() 当时间到来的时候,推动任何价值通过它

    下面是一些简化的示例,其中包含一些组合的API,以澄清:

    创建

    return theFluxForSSE.takeUntilOther(Mono.create(sink ->
        sessionEvent.registerListenerForClose(closeEvent -> sink.success(closeEvent))
    ));
    

    单处理器

    MonoProcessor<String> processor = MonoProcessor.create();
    beDisposeFlow.add(processor); // make it available to your session scope?
    return theFluxForSSE.takeUntilOther(processor); //Spring will subscribe to this
    

    让我们用计划任务模拟会话结束:

    Executors.newSingleThreadScheduledExecutor().schedule(() ->
        processor.onNext("STOP") // that's the key part: manually sending data through the processor to signal takeUntilOther
    , 2, TimeUnit.SECONDS);
    

    下面是一个模拟的单元测试示例,您可以运行它来更好地理解发生了什么:

    @Test
    public void simulation() {
        Flux<Long> theFluxForSSE = Flux.interval(Duration.ofMillis(100));
    
        MonoProcessor<String> processor = MonoProcessor.create();
        Executors.newSingleThreadScheduledExecutor().schedule(() -> processor.onNext("STOP"), 2, TimeUnit.SECONDS);
    
        theFluxForSSE.takeUntilOther(processor.log())
                     .log()
                     .blockLast();
    }