代码之家  ›  专栏  ›  技术社区  ›  indusBull

弹簧靴+弹簧批+弹簧JPA

  •  2
  • indusBull  · 技术社区  · 7 年前

    我正在从事将数据从Sql Server移动到Cassandra的Spring批处理作业。我使用Spring数据JPA来读写数据。我已经为这两个数据库创建了实体和JPA存储库。

    现在,我不知道如何将我的JpaRepostorty与Spring Batch ItemReader一起使用。我在互联网上搜索过,发现很少有参考文献提到使用JpaPageItemReader。但这需要指定查询并配置其他详细信息。但我不知道如何使用现有的JpaRepository。下面是相关代码的片段-

    我的JpaRepostory for SQL Server-

    public interface ScanJpaRepository extends JpaRepository<Scan, Integer> 
    {
    
       @Transactional(readOnly = true)
       @Query("select s from Scan s left join fetch s.projectVersion")
       Stream<Scan> findAllScan();
    }
    

    我的春季批处理作业-

    @Configuration
    @EnableBatchProcessing
    public class SSCBatchConfigurationCassandra {
    
        @Autowired
        public JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        public StepBuilderFactory stepBuilderFactory;
    
    
        @Bean
        public PlatformTransactionManager transactionManager() {
            return new ResourcelessTransactionManager();
        }
    
    
        @Bean
        public JobExplorer jobExplorer() throws Exception {
            MapJobExplorerFactoryBean jobExplorerFactory = new MapJobExplorerFactoryBean(mapJobRepositoryFactoryBean());
            jobExplorerFactory.afterPropertiesSet();
            return jobExplorerFactory.getObject();
        }
    
        @Bean
        public MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean() {
            MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean = new MapJobRepositoryFactoryBean();
            mapJobRepositoryFactoryBean.setTransactionManager(transactionManager());
            return mapJobRepositoryFactoryBean;
        }
    
        @Bean
        public JobRepository jobRepository() throws Exception {
            return mapJobRepositoryFactoryBean().getObject();
        }
    
        @Bean
        public JobLauncher jobLauncher() throws Exception {
            SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
            simpleJobLauncher.setJobRepository(jobRepository());
            return simpleJobLauncher;
        }
    
    
        @Bean
        public ItemReader<Project> reader() {
             **// how to read from ScanJpaRepository ??**
        }
    
        @Bean
        public CassandraItemProcessor processor() {
            return new CassandraItemProcessor();
        }
    
        @Bean
        public ItemWriter<CassandraProject> cqlWriter() {
             final CassandraBatchItemWriter writer = new CassandraBatchItemWriter();
             return writer;
        }
    
    
    
        // tag::jobstep[]
        @Bean
        public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
            return jobBuilderFactory.get("importUserJob")
                    .incrementer(new RunIdIncrementer())
                    .listener(listener)
                    .flow(step1)
                    .end()
                    .build();
        }
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step1")
                    .<Project, CassandraProject> chunk(100)
                    .reader(reader())
                    .processor(processor())
                    .writer()
                    .build();
        }
    
    
        // end::jobstep[]
    }
    

    更新#1: 如前所述,我将条目读取器添加到批处理作业中。

    @Configuration
    @EnableBatchProcessing
    public class FortifySSCBatchConfigurationCassandra {
       ....
       @Autowired
       public ScanItemReader itemReader;
       .....
    
       @Bean
       public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Scan, CScan> chunk(100)
                .reader(itemReader)
                .processor(processor())
                .writer(cqlWriter())
                .build();
         }
    
    
    }
    

    我的IDE对此表示不满-

    The method reader(ItemReader<? extends Scan>) in the type SimpleStepBuilder<Scan,CScan> is not applicable for the arguments (ScanItemReader)
    

    更新#2:

    public class CassandraItemProcessor implements ItemProcessor<Scan, CScan> {
    
    
        @Override
        public CScan process(Scan s) throws Exception {
    
    
            .. 
    
            return new CScan();
        }
    
    }
    
    
    public class CassandraBatchItemWriter implements ItemWriter<CScan> {
    
    
    
    
        @Override
        public void write(List<? extends CScan> arg0) throws Exception {
            // TODO Auto-generated method stub
    
        }
    
    
    }
    
    2 回复  |  直到 7 年前
        1
  •  2
  •   pvpkiran    7 年前

    你可以这样声明你的阅读器

    @Component
    @JobScope
    public class ScanItemReader extends RepositoryItemReader<Scan> {
    
      private final ScanJpaRepository repository;
    
      @Autowired
      public ScanItemReader(final ScanJpaRepository repository) {
        super();
        this.repository = repository;
      }
    
      @PostConstruct
      protected void init() {
        final Map<String, Sort.Direction> sorts = new HashMap<>();
        sorts.put("Your sort parameter"), Direction.ASC);// This could be any field name of your Entity class
        this.setRepository(this.repository);
        this.setSort(sorts);
        this.setMethodName(""); // You should sepcify the method which  
                   //spring batch should call in your repository to fetch 
                   // data and the arguments it needs needs to be  
                   //specified with the below method.  
                   // And this method  must return Page<T>
        this.setArguments();
      }
    }
    

    自动连接此读卡器bean并在StepBuilder中使用它。

        2
  •  0
  •   Puneet Kumar    7 年前

    我也在试着做同样的事情,但却被绊倒了。如果它对您有用,请共享git存储库或任何其他参考。我想看看我们如何从一个数据库读取数据并写入另一个数据库。我已经有了JPA方法。只想自动连线并使用它们。