代码之家  ›  专栏  ›  技术社区  ›  Sergio Sánchez Sánchez

Spring集成MongoDbStoringMessageHandler ClassCastException:BasicDBObject不能转换为BasicDBList

  •  0
  • Sergio Sánchez Sánchez  · 技术社区  · 8 年前

    MongoDbMessageSource 对于与用户相关的每一种社交媒体,我都会收到发给他的评论。

    我希望在MongoDB中保留这些评论,并提供帮助 链接到频道 .

    流程如下:

    @Configuration
    @IntegrationComponentScan
    public class InfrastructureConfiguration {
    
        private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class);
    
        /**
         * The Pollers builder factory can be used to configure common bean definitions or 
         * those created from IntegrationFlowBuilder EIP-methods
         */
        @Bean(name = PollerMetadata.DEFAULT_POLLER)
        public PollerMetadata poller() {
            return Pollers.fixedDelay(10, TimeUnit.SECONDS).get();
        }
    
        @Bean
        public TaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(5);
            executor.setMaxPoolSize(10);
            executor.setQueueCapacity(25);
            return executor;
        }
    
        /**
         * 
         * MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload 
         * which is the result of execution of a Query
         */
        @Bean
        @Autowired
        public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
            MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
            messageSource.setExpectSingleResult(false);
            messageSource.setEntityClass(UserEntity.class);
            messageSource.setCollectionNameExpression(new LiteralExpression("users"));
            return messageSource;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "storeChannel")
        public MessageHandler mongodbAdapter(MongoDbFactory mongo) throws Exception {
            MongoDbStoringMessageHandler adapter = new MongoDbStoringMessageHandler(mongo);
            adapter.setCollectionNameExpression(new LiteralExpression("comments"));
            return adapter;
        }
    
        @Bean
        @Autowired
        public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) {
            return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller))
                    .<List<UserEntity>, Map<ObjectId, List<SocialMediaEntity>>>transform(userEntitiesList
                            -> userEntitiesList.stream().collect(Collectors.toMap(UserEntity::getId, UserEntity::getSocialMedia))
                    )
                    .split(new AbstractMessageSplitter() {
                        @Override
                        protected Object splitMessage(Message<?> msg) {
                            return ((Map<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).entrySet();
                        }
                    })
                    .channel("directChannel_1")
                    .enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key")))
                    .split(new AbstractMessageSplitter() {
                        @Override
                        protected Object splitMessage(Message<?> msg) {
                            return ((Entry<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).getValue();
                        }
                    })
                    .channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
                    .<SocialMediaEntity, SocialMediaTypeEnum>route(p -> p.getType(),
                            m
                            -> m.subFlowMapping(SocialMediaTypeEnum.FACEBOOK, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
                                    @Override
                                    public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
                                        ObjectId userId = (ObjectId)headers.get("user-id");
                                        logger.info("TEST FACEBOOK Channel for user id: " + userId);
                                        return Arrays.asList(new CommentEntity[] { 
                                            new CommentEntity("Comentario 1 from facebook dirigido a " + userId, userId),
                                            new CommentEntity("Comentario 2 from facebook dirigido a " + userId, userId)
                                        });
                                    }
                                }))
                                .subFlowMapping(SocialMediaTypeEnum.YOUTUBE, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
                                    @Override
                                    public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
                                        ObjectId userId = (ObjectId)headers.get("user-id");
                                        logger.info("TEST YOUTUBE Channel for user id: " + userId);
                                        return Arrays.asList(new CommentEntity[] { 
                                            new CommentEntity("Comentario 1 from youtube dirigido a " + userId, userId),
                                            new CommentEntity("Comentario 2 from youtube dirigido a " + userId, userId)
                                        });
                                    }
                                }))
                                .subFlowMapping(SocialMediaTypeEnum.INSTAGRAM, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
                                    @Override
                                    public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
                                        ObjectId userId = (ObjectId)headers.get("user-id");
                                        logger.info("TEST INSTAGRAM Channel for user id: " + userId);
                                        return Arrays.asList(new CommentEntity[] { 
                                            new CommentEntity("Comentario 1 from instagram dirigido a " + userId, userId),
                                            new CommentEntity("Comentario 2 from instagram dirigido a " + userId, userId)
                                        });
                                    }
                                }))
                    )
                    .channel("directChannel_2")
                    .aggregate()
                    .channel("directChannel_3")
                    .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
                            comments.stream().flatMap(List::stream).collect(Collectors.toList()))
                    .aggregate()
                    .channel("directChannel_4")
                    .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
                            comments.stream().flatMap(List::stream).collect(Collectors.toList()))
                    .channel("storeChannel")
                    .get();
        }
    
    }
    

    错误之前的调试消息如下:

    2017-07-24 15:43:03.265 DEBUG 15152 --- [ taskExecutor-3] o.s.integration.channel.DirectChannel    : preSend on channel 'storeChannel', message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
    2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] ssor$ReplyProducingMessageHandlerWrapper : infrastructureConfiguration.mongodbAdapter.serviceActivator.handler received message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
    2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] o.s.i.m.o.MongoDbStoringMessageHandler   : mongodbAdapter received message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
    

    @Document(collection="comments")
    public class CommentEntity {
    
        @Id
        private ObjectId id;
    
        @Field("message")
        private String message;
    
        private ObjectId user;
    
        @PersistenceConstructor
        public CommentEntity(String message, ObjectId user) {
            this.message = message;
            this.user = user;
        }
    
        public ObjectId getId() {
            return id;
        }
    
        public void setId(ObjectId id) {
            this.id = id;
        }
    
        public String getMessage() {
            return message;
        }
    
        public void setMessage(String message) {
            this.message = message;
        }
    
        public ObjectId getUser() {
            return user;
        }
    
        public void setUser(ObjectId user) {
            this.user = user;
        }
    
    }
    

    然后出现此异常:

    2017-07-24 15:43:03.271 ERROR 15152 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [mongodbAdapter]; nested exception is java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.BasicDBList, failedMessage=GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
    

    <dependency>
      <groupId>de.flapdoodle.embed</groupId>
      <artifactId>de.flapdoodle.embed.mongo</artifactId>
    </dependency>
    

    有人知道我做错了吗?提前谢谢。

    1 回复  |  直到 8 年前
        1
  •  1
  •   Artem Bilan    8 年前

    这个例外清楚地表明 MongoDbStoringMessageHandler 不支持收集以保存:

    protected void handleMessageInternal(Message<?> message) throws Exception {
        Assert.isTrue(this.initialized, "This class is not yet initialized. Invoke its afterPropertiesSet() method");
        String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class);
        Assert.notNull(collectionName, "'collectionNameExpression' must not evaluate to null");
    
        Object payload = message.getPayload();
    
        this.mongoTemplate.save(payload, collectionName);
    }
    

    你不需要 .aggregate()

    我认为这应该是一个很好的补充,让该组件能够执行:

    /**
     * Insert a mixed Collection of objects into a database collection determining the collection name to use based on the
     * class.
     *
     * @param collectionToSave the list of objects to save.
     */
    void insertAll(Collection<? extends Object> objectsToSave);
    

    请在这件事上提出一个JIRA,不要犹豫贡献!