我正在从事将数据从Sql Server移动到Cassandra的Spring批处理作业。我使用Spring数据JPA来读写数据。我已经为这两个数据库创建了实体和JPA存储库。
现在,我不知道如何将我的JpaRepostorty与Spring Batch ItemReader一起使用。我在互联网上搜索过,发现很少有参考文献提到使用JpaPageItemReader。但这需要指定查询并配置其他详细信息。但我不知道如何使用现有的JpaRepository。下面是相关代码的片段-
我的JpaRepostory for SQL Server-
public interface ScanJpaRepository extends JpaRepository<Scan, Integer>
{
@Transactional(readOnly = true)
@Query("select s from Scan s left join fetch s.projectVersion")
Stream<Scan> findAllScan();
}
我的春季批处理作业-
@Configuration
@EnableBatchProcessing
public class SSCBatchConfigurationCassandra {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public PlatformTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public JobExplorer jobExplorer() throws Exception {
MapJobExplorerFactoryBean jobExplorerFactory = new MapJobExplorerFactoryBean(mapJobRepositoryFactoryBean());
jobExplorerFactory.afterPropertiesSet();
return jobExplorerFactory.getObject();
}
@Bean
public MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean() {
MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean = new MapJobRepositoryFactoryBean();
mapJobRepositoryFactoryBean.setTransactionManager(transactionManager());
return mapJobRepositoryFactoryBean;
}
@Bean
public JobRepository jobRepository() throws Exception {
return mapJobRepositoryFactoryBean().getObject();
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository());
return simpleJobLauncher;
}
@Bean
public ItemReader<Project> reader() {
**// how to read from ScanJpaRepository ??**
}
@Bean
public CassandraItemProcessor processor() {
return new CassandraItemProcessor();
}
@Bean
public ItemWriter<CassandraProject> cqlWriter() {
final CassandraBatchItemWriter writer = new CassandraBatchItemWriter();
return writer;
}
// tag::jobstep[]
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Project, CassandraProject> chunk(100)
.reader(reader())
.processor(processor())
.writer()
.build();
}
// end::jobstep[]
}
更新#1:
如前所述,我将条目读取器添加到批处理作业中。
@Configuration
@EnableBatchProcessing
public class FortifySSCBatchConfigurationCassandra {
....
@Autowired
public ScanItemReader itemReader;
.....
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Scan, CScan> chunk(100)
.reader(itemReader)
.processor(processor())
.writer(cqlWriter())
.build();
}
}
我的IDE对此表示不满-
The method reader(ItemReader<? extends Scan>) in the type SimpleStepBuilder<Scan,CScan> is not applicable for the arguments (ScanItemReader)
更新#2:
public class CassandraItemProcessor implements ItemProcessor<Scan, CScan> {
@Override
public CScan process(Scan s) throws Exception {
..
return new CScan();
}
}
public class CassandraBatchItemWriter implements ItemWriter<CScan> {
@Override
public void write(List<? extends CScan> arg0) throws Exception {
// TODO Auto-generated method stub
}
}