我认为部分问题是你试图
Disposable
在会话结束时要调用的。但是这样做的时候,你自己就订阅了序列。Spring框架还将订阅
Flux
返回者
getStreamSuspendCount
,并且需要取消订阅,以便SSE客户端得到通知。
现在如何做到这一点?你需要的是一种“阀门”,一旦接收到外部信号,它就会取消信号源。这就是
takeUntilOther(Publisher<?>)
做。
所以现在你需要一个
Publisher<?>
您可以将其与会话生命周期(更具体地说是会话关闭事件)联系起来:一旦它发出,
takeUntilOther
将取消其源。
2种选择:
-
会话关闭事件在类似api:use的侦听器中公开
Mono.create
-
您确实需要手动触发取消:使用
MonoProcessor.create()
当时间到来的时候,推动任何价值通过它
下面是一些简化的示例,其中包含一些组合的API,以澄清:
创建
return theFluxForSSE.takeUntilOther(Mono.create(sink ->
sessionEvent.registerListenerForClose(closeEvent -> sink.success(closeEvent))
));
单处理器
MonoProcessor<String> processor = MonoProcessor.create();
beDisposeFlow.add(processor); // make it available to your session scope?
return theFluxForSSE.takeUntilOther(processor); //Spring will subscribe to this
让我们用计划任务模拟会话结束:
Executors.newSingleThreadScheduledExecutor().schedule(() ->
processor.onNext("STOP") // that's the key part: manually sending data through the processor to signal takeUntilOther
, 2, TimeUnit.SECONDS);
下面是一个模拟的单元测试示例,您可以运行它来更好地理解发生了什么:
@Test
public void simulation() {
Flux<Long> theFluxForSSE = Flux.interval(Duration.ofMillis(100));
MonoProcessor<String> processor = MonoProcessor.create();
Executors.newSingleThreadScheduledExecutor().schedule(() -> processor.onNext("STOP"), 2, TimeUnit.SECONDS);
theFluxForSSE.takeUntilOther(processor.log())
.log()
.blockLast();
}