我们正在使用Spring Boot 3.0.4和Spring Batch 5.0.0&Java 1.19。
我们已经创建了一个带有RepositoryItemReader的作业,处理器&ItemWriter正在使用块。
基本上,我们从一个表(MySQL DB)中读取大块记录,然后在项目处理器中将数据转换为新对象,然后将实体写回数据库中的新表(MySQL数据库)。
我们正在测试1M条记录,每条记录的数据块为10K。
在这一步中,我们看到堆大小的内存一直在增长,而内存并没有被释放。
我们将步骤配置如下:
@Bean
public Step createTransactionsStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader itemReaderBillingTransaction,
BillingTransactionWriter writer,
RepositoryItemReader<BillingTransaction> myReader
) {
return new StepBuilder(CREATE_TRANSACTIONS, jobRepository)
.<BillingTransaction, Transaction>chunk(chunkSize,transactionManager)
.reader(myReader)
.processor(createTransactionsItemProcessor())
.writer(writer)
.listener(createTransactionListener)
.build();
}
@Bean
@StepScope
public RepositoryItemReader<BillingTransaction> myReader(BillingTransactionRepositoryCustom repository,
@Value("#{jobParameters[bsLogId]}") String bsLogId) {
Map<String, Sort.Direction> sortMap = new HashMap<>();
sortMap.put("recordId", Sort.Direction.ASC);
return new RepositoryItemReaderBuilder<BillingTransaction>()
.repository(repository)
.methodName("findByBsLogIdAndBsStatus")
.arguments(Arrays.asList(Integer.valueOf(bsLogId), VALIDATIONS_IN_PROCESS.code))
.pageSize(chunkSize)
.sorts(sortMap)
.saveState(false)
.build();
}
@Bean
public ItemProcessor<BillingTransaction, Transaction> createTransactionsItemProcessor() {
return new CreateTransactionsItemProcessor();
}
下面是我们对CreateTransactionsItemProcessor的实现
public class CreateTransactionsItemProcessor implements ItemProcessor<BillingTransaction, Transaction> {
@Autowired
TransactionCreator transactionCreator;
@Autowired
AshraitCfgRepository ashraitCfgRepository;
@Autowired
CustomRoutingDS customRoutingDS;
private String merchantId;
@Autowired
protected LogWriter logWriter;
private StepExecution stepExecution;
@Autowired
BillingLogRepository billingLogRepository;
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
String gatewayId = getStepJobParameterValue(stepExecution, JOB_PARAMETER_GATEWAY_ID);
String pid = getStepJobParameterValue(stepExecution, JOB_PARAMETER_PID);
String bsLogId = getStepJobParameterValue(stepExecution, JOB_PARAMETER_BS_LOG_ID);
Optional.ofNullable(pid)
.ifPresentOrElse(this::initBillingLog, () -> {
throw new RuntimeException();
});
// Saving in a step context some parameters that will be used in continue
customRoutingDS.setCurrentGatewayId(gatewayId);
stepExecution.getJobExecution().getExecutionContext().put("ashVersion", ashraitCfgRepository.getAshVersion());
stepExecution.getJobExecution().getExecutionContext().put("manufUse", ashraitCfgRepository.getManufUse());
stepExecution.getJobExecution().getExecutionContext().put("manufId", ashraitCfgRepository.getManufId());
stepExecution.getJobExecution().getExecutionContext().put("merchantId", merchantId);
stepExecution.getJobExecution().getExecutionContext().put("GATEWAY_LOG_NUMERATOR",0);
stepExecution.getJobExecution().getExecutionContext().put("RQUEST_ID",String.format("%s-%s-%s", pid, bsLogId, 0));
DateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
stepExecution.getJobExecution().getExecutionContext().put("date", dateFormat.format(new Date()));
}
@Override
public Transaction process(BillingTransaction item) throws Exception {
Transaction tr;
try {
tr = transactionCreator.createTransaction(item, stepExecution);
} catch (Exception e) {
logWriter.info(stepExecution, this.getClass().getSimpleName(), e.toString());
throw new RuntimeException(e);
}
setJobExecutionContext(stepExecution.getJobExecution());
return tr;
}
以下是我们对作者的实现:
@Component
public class BillingTransactionWriter implements
ItemWriter<Transaction>, StepExecutionListener {
private JobExecution jobExecution;
@Autowired
protected LogWriter logWriter;
@Autowired
private GatewayLogRepositoryCustom gatewayLogRepositoryCustom;
@Autowired
private ShovarNumeratorRepository shovarNumeratorRepository;
@Autowired
private CustomRoutingDS customRoutingDS;
@Autowired
private EntityManagerDynamicQueryService<String> entityManagerDynamicQueryService;
@Override
public void beforeStep(@NotNull StepExecution stepExecution) {
logWriter.info(stepExecution, this.getClass().getSimpleName(), "BillingTransaction writer initialized.");
jobExecution = stepExecution.getJobExecution();
// jobExecution.getExecutionContext().put("GATEWAY_LOG_NUMERATOR", 0);
jobExecution.getExecutionContext().put("INSERT_TOTAL_NUMERATOR", 0);
}
@Override
public void write(Chunk<? extends Transaction> transactions) {
int incr = Integer.parseInt(Objects.requireNonNull(jobExecution.getExecutionContext().get("GATEWAY_LOG_NUMERATOR")).toString());
int total = Integer.parseInt(Objects.requireNonNull(jobExecution.getExecutionContext().get("INSERT_TOTAL_NUMERATOR")).toString());
List<Transaction> bts = (List<Transaction>) transactions.getItems();
System.out.printf("Received list of size: %d total insert : %d, GatewayLog Numerator is : %d%n", bts.size(), total, incr);
total += bts.size();
List<String> transactionIds = insertToGatewayLog(bts);
insertToTransactions(bts, transactionIds);
jobExecution.getExecutionContext().put("GATEWAY_LOG_NUMERATOR", ++incr);
jobExecution.getExecutionContext().put("INSERT_TOTAL_NUMERATOR", total);
String pid = jobExecution.getJobParameters().getString(JOB_PARAMETER_PID);
String bsLogId = jobExecution.getJobParameters().getString(JOB_PARAMETER_BS_LOG_ID);
jobExecution.getExecutionContext().put("RQUEST_ID", String.format("%s-%s-%s", pid, bsLogId, incr));
}
根据我们对1M记录的测试,我们需要为HEAP SPACE消耗约8GB的可用RAM,否则我们会得到以下异常:
org.springframework.batch.core.step.AbstractStep][taskExecutor-1][ERROR] Encountered an error executing step createTransactions in job batchJob
java.lang.OutOfMemoryError: Java heap space
此外,此内存不会在作业完成后释放。
我们预计,将Spring Batch与带有块的内置读写器一起使用将有效地使用堆内存,并在特定块完成后释放内存。