代码之家  ›  专栏  ›  技术社区  ›  Tomek Zaremba

Spring-Webflux和@Cacheable-Mono/Flux类型缓存结果的正确方法

  •  26
  • Tomek Zaremba  · 技术社区  · 7 年前

    我正在学习SpringWebFlux,在编写示例应用程序的过程中,我发现了一个与反应型(Mono/Flux)和Spring缓存相结合的问题。

    考虑以下代码段(在Kotlin中):

    @Repository
    interface TaskRepository : ReactiveMongoRepository<Task, String>
    
    @Service
    class TaskService(val taskRepository: TaskRepository) {
    
        @Cacheable("tasks")
        fun get(id: String): Mono<Task> = taskRepository.findById(id)
    }
    

    这种缓存返回Mono或Flux的方法调用的方法有效且安全吗?也许还有其他一些原则可以做到这一点?

    以下代码正在使用SimpleCacheSolver,但默认情况下,由于Mono不可序列化,Redis会失败。为了使它们工作,例如需要使用Kryo序列化程序。

    3 回复  |  直到 7 年前
        1
  •  42
  •   Oleh Dokuka    7 年前

    黑客方式

    目前,还没有流畅的集成 @Cacheable 3号反应堆。 但是,您可以通过添加 .cache() 要返回的操作员 Mono

    @Repository
    interface TaskRepository : ReactiveMongoRepository<Task, String>
    
    @Service
    class TaskService(val taskRepository: TaskRepository) {
    
        @Cacheable("tasks")
        fun get(id: String): Mono<Task> = taskRepository.findById(id).cache()
    }
    

    那个 乱劈 从返回的缓存和共享 taskRepository 数据反过来,spring cacheable将缓存返回的 单声道 然后,将返回该引用。换句话说,它是一个mono缓存,它保存着缓存:)。

    反应堆加载项方式

    有一个 addition 到Reactor 3,可与现代内存缓存流畅集成,如 caffeine , jcache 等。使用该技术,您将能够轻松缓存数据:

    @Repository
    interface TaskRepository : ReactiveMongoRepository<Task, String>
    
    @Service
    class TaskService(val taskRepository: TaskRepository) {
    
        @Autowire
        CacheManager manager;
    
    
        fun get(id: String): Mono<Task> = CacheMono.lookup(reader(), id)
                                                   .onCacheMissResume(() -> taskRepository.findById(id))
                                                   .andWriteWith(writer());
    
        fun reader(): CacheMono.MonoCacheReader<String, Task> = key -> Mono.<Signal<Task>>justOrEmpty((Signal) manager.getCache("tasks").get(key).get())
        fun writer(): CacheMono.MonoCacheWriter<String, Task> = (key, value) -> Mono.fromRunnable(() -> manager.getCache("tasks").put(key, value));
    } 
    

    注意:Reactor插件缓存自己的抽象 Signal<T> ,因此,不要担心这一点,也不要遵循这一惯例

        2
  •  3
  •   ilker Kopan    6 年前

    我使用了Oleh Dokuka的黑客解决方案,效果很好,但有一个陷阱。在Flux cache中使用的持续时间必须大于可缓存缓存的timetolive值。如果不使用通量缓存的持续时间,则不会使其无效(通量文档称“将此通量转换为热源,并缓存最后发出的信号以供其他订阅者使用”)。 因此,将Flux cache设置为2分钟,timetolive设置为30秒是有效的配置。如果首先发生ehcahce超时,则会生成一个新的Flux cache引用并使用它。

        3
  •  -1
  •   Developer    5 年前

    //在立面中:

    public Mono<HybrisResponse> getProducts(HybrisRequest request) {
        return Mono.just(HybrisResponse.builder().build());
    }
    

    //在服务层中:

    @Cacheable(cacheNames = "embarkations")
    public HybrisResponse cacheable(HybrisRequest request) {
        LOGGER.info("executing cacheable");
        return null;
    }
    
    @CachePut(cacheNames = "embarkations")
    public HybrisResponse cachePut(HybrisRequest request) {
        LOGGER.info("executing cachePut");
        return hybrisFacade.getProducts(request).block();
    }
    

    //在控制器中:

    HybrisResponse hybrisResponse = null;
    
    try {
       // get from cache
       hybrisResponse = productFeederService.cacheable(request);
    
    } catch (Throwable e) {
       // if not in cache then cache it
       hybrisResponse = productFeederService.cachePut(request);
    }
    
    return Mono.just(hybrisResponse)
        .map(result -> ResponseBody.<HybrisResponse>builder()
            .payload(result).build())
        .map(ResponseEntity::ok);