代码之家  ›  专栏  ›  技术社区  ›  Debopam

Spring集成DSL:Dispatcher没有订户

  •  0
  • Debopam  · 技术社区  · 7 年前

    Dispatcher has no subscribers, failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}]
    
    2018-09-24 12:16:22.004 DEBUG 17536 --- [ask-scheduler-2] o.s.i.channel.PublishSubscribeChannel    : postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.thirdpartyAgentDemographicFlow-Processing'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}], failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}], headers={id=9e342354-8436-e1de-774e-937c8b6809d5, timestamp=1537816582001}] for original GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}]
    

    代码/集成流程

    @Bean("sftpAgentInboundFlow")
    public IntegrationFlow sftpAgentInboundFlow(SessionFactory<LsEntry> sftpSessionFactory) {
        return IntegrationFlows
                .from(Sftp.inboundAdapter(sftpSessionFactory)
                        .deleteRemoteFiles(false)
                        .preserveTimestamp(true)
                        .remoteDirectory(agentRemoteDir)
                        .filter(new AcceptOnceFileListFilter<>())
                        .regexFilter(".*\\.zip$")
                        .localDirectory(new File(inputDir))
                        .autoCreateLocalDirectory(true)
                        .maxFetchSize(1)
                        ,
                        consumer -> consumer.id("sftpInboundAdapter")
                        .autoStartup(false)
                        .poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)))
                .publishSubscribeChannel(pubSub -> pubSub
                                .id("AgentInboundDemographic-PubSub-Channel")
                                .subscribe(flow -> flow.bridge(e -> e.id("ziparchiver")).handle(agentDataArchiveChannelAdapter()))
                                .subscribe(surancebayAgentDemographicFlow())
                        )
                .get();
    }
    
    
    
    //@Bean("surancebayAgentDemographicFlow")
    public IntegrationFlow surancebayAgentDemographicFlow() {
        return IntegrationFlows
                //.from(inputFileSource(), spec -> spec.poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)/*.maxMessagesPerPoll(corepoolsize)*/))
                .from(MessageChannels.direct("thirdpartyAgentDemographicFlow-Processing"))
                .transform(unZipTransformer())
                .split(splitter())
                .channel(MessageChannels.executor(taskExecutor()))
                .<File, Boolean>route(f -> f.getName().contains("individual"), m -> m
                        .subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
                        .subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
                        )
                .aggregate(aggregator -> aggregator.groupTimeout(messageGroupWaiting).correlationStrategy(new CorrelationStrategy() {
    
                    @Override
                    public Object getCorrelationKey(Message<?> message) {
                        return "processdate";
                    }
                }).sendPartialResultOnExpiry(true))
                .handle("agentDemograpicOutput","generateAgentDemographicFile")
                .channel(confirmChannel())
                .get()
                ;
    }
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   Artem Bilan    7 年前

    好 啊!我认为问题是你使用了一些Spring集成版本,其中的特性使用了一个外部接口 IntegrationFlow 作为一个子流尚未实现。或者考虑升级到最新版本,或者使用变通方法作为解决方案 .subscribe("thirdpartyAgentDemographicFlow-Processing") @Bean 上的批注 surancebayAgentDemographicFlow 定义。