代码之家  ›  专栏  ›  技术社区  ›  Hasnain Ali Bohra

如何从Spring集成文件中读取嵌套的txt文件

  •  0
  • Hasnain Ali Bohra  · 技术社区  · 4 年前

    我有如下配置文件:-

    @EnableBinding(Source.class)
    @Configuration
    @EnableIntegrationManagement
    public class FileSourceConfig {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(FileSourceConfig.class);
    
        private FileSourceProperties properties;
    
        Source source;
    
        public FileSourceConfig(FileSourceProperties properties, Source source) {
            this.properties = properties;
            this.source = source;
        }
    
        @Bean
        public DynamicRegexPatternFilter getFilter(){
            return new DynamicRegexPatternFilter();
        }
    
    
        @Bean
        public MessageChannel linesChannel() {
            return new DirectChannel();
        }
    
        /* To poll the file for every given TimeUnit.SECONDS */
        @Bean(name = { "defaultPoller", PollerMetadata.DEFAULT_POLLER })
        public PollerMetadata defaultPoller() {
            PollerMetadata pollerMetadata = new PollerMetadata();
            pollerMetadata.setTrigger(new PeriodicTrigger(properties.getPollPeriod(), TimeUnit.SECONDS));
            return pollerMetadata;
        }
    
        @Bean
        public IntegrationFlow fileInboundChannelFlow() {
            FileInboundChannelAdapterSpec messageSourceSpec = Files
                    .inboundAdapter(Paths.get(this.properties.getDirectory()).toFile());
    
            messageSourceSpec = messageSourceSpec.filter(getFilter());
    
            //messageSourceSpec.regexFilter(this.properties.getFilenameRegex());
            messageSourceSpec.preventDuplicates(this.properties.isPreventDuplicates());
    
            //Setting random UUID as messagekey
            IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceSpec)
                    .split(new FileSplitter(true, true))
                    .enrichHeaders(h -> h.headerExpression(KafkaHeaders.MESSAGE_KEY,"T(java.util.UUID).randomUUID().toString()"));
    
            return flowBuilder.<Object, Class<?>>route(Object::getClass,
                    m -> m.channelMapping(FileSplitter.FileMarker.class, "markers.input").channelMapping(String.class,
                            "lines.input"))
                    .get();
        }
    
        @Bean
        public IntegrationFlow lines() {
            return f -> f.headerFilter("file_originalFile") .channel(source.output());
        }
    
    
        @Bean
        public IntegrationFlow logErrors() {
            return f -> f.log(LoggingHandler.Level.ERROR, "error", m -> "Error in sending message :"+m.getPayload());
        }
    
        @Bean
        public IntegrationFlow markers() {
            return f -> f.log().<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END))
                    .handle(m -> m.getHeaders(), e -> e.id("archive").advice(afterAdvice()));
        }}
    

    有人可以建议如何从inbound/a/a.txt和inblud/b/b.txt读取文件吗

    请在下面找到筛选代码:-

    public class DynamicRegexPatternFilter extends AbstractFileListFilter<File> {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(DynamicRegexPatternFilter.class);
    
        @Autowired
        private FileSourceProperties properties;
    
        @Override
        public boolean accept(File file) {
            String[] quaterRange = {"[0][0-3]", "[0][4-6]", "[0][7-9]", "[1][0-2]"};
            String fileNameRegex = this.properties.getFilenameRegex();
            //logic here
    
            return Pattern.compile(fileNameRegex)
                    .matcher(file.getName())
                    .matches();
        }
    
    0 回复  |  直到 4 年前
        1
  •  2
  •   Artem Bilan    4 年前

    请参阅 RecursiveDirectoryScanner . 默认情况下 FileReadingMessageSource 附带a DefaultDirectoryScanner 。因此,您只需配置您的 messageSourceSpec 有了这个 递归目录扫描程序 :

    FileInboundChannelAdapterSpec messageSourceSpec =
                    Files.inboundAdapter(Paths.get(this.properties.getDirectory()).toFile())
                            .scanner(new RecursiveDirectoryScanner());
    

    另请参阅文档: https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#directory-scanning-and-polling