代码之家  ›  专栏  ›  技术社区  ›  Yossi R

Spring批处理存储库ItemReader HEAP SIZE问题

  •  0
  • Yossi R  · 技术社区  · 2 年前

    我们正在使用Spring Boot 3.0.4和Spring Batch 5.0.0&Java 1.19。

    我们已经创建了一个带有RepositoryItemReader的作业,处理器&ItemWriter正在使用块。 基本上,我们从一个表(MySQL DB)中读取大块记录,然后在项目处理器中将数据转换为新对象,然后将实体写回数据库中的新表(MySQL数据库)。

    我们正在测试1M条记录,每条记录的数据块为10K。

    在这一步中,我们看到堆大小的内存一直在增长,而内存并没有被释放。

    我们将步骤配置如下:

    @Bean
    public Step createTransactionsStep(JobRepository jobRepository,
                                       PlatformTransactionManager transactionManager,
                                       ItemReader itemReaderBillingTransaction,
                                       BillingTransactionWriter writer,
                                       RepositoryItemReader<BillingTransaction> myReader
                                     ) {
    
        return new StepBuilder(CREATE_TRANSACTIONS, jobRepository)
                .<BillingTransaction, Transaction>chunk(chunkSize,transactionManager)
                .reader(myReader)
                .processor(createTransactionsItemProcessor())
                .writer(writer)
                .listener(createTransactionListener)
                .build();
    }
    
    @Bean
    @StepScope
    public RepositoryItemReader<BillingTransaction> myReader(BillingTransactionRepositoryCustom repository,
                                                             @Value("#{jobParameters[bsLogId]}") String bsLogId) {
    
        Map<String, Sort.Direction> sortMap = new HashMap<>();
        sortMap.put("recordId", Sort.Direction.ASC);
    
        return new RepositoryItemReaderBuilder<BillingTransaction>()
                .repository(repository)
                .methodName("findByBsLogIdAndBsStatus")
                .arguments(Arrays.asList(Integer.valueOf(bsLogId), VALIDATIONS_IN_PROCESS.code))
                .pageSize(chunkSize)
                .sorts(sortMap)
                .saveState(false)
                .build();
    }
    
    @Bean
    public ItemProcessor<BillingTransaction, Transaction> createTransactionsItemProcessor() {
        return new CreateTransactionsItemProcessor();
    }
    

    下面是我们对CreateTransactionsItemProcessor的实现

    public class CreateTransactionsItemProcessor implements ItemProcessor<BillingTransaction, Transaction> {
    
    
    @Autowired
    TransactionCreator transactionCreator;
    
    @Autowired
    AshraitCfgRepository ashraitCfgRepository;
    
    @Autowired
    CustomRoutingDS customRoutingDS;
    
    private String merchantId;
    
    @Autowired
    protected LogWriter logWriter;
    
    private StepExecution stepExecution;
    
    @Autowired
    BillingLogRepository billingLogRepository;
    
    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
        String gatewayId = getStepJobParameterValue(stepExecution, JOB_PARAMETER_GATEWAY_ID);
        String pid = getStepJobParameterValue(stepExecution, JOB_PARAMETER_PID);
        String bsLogId = getStepJobParameterValue(stepExecution, JOB_PARAMETER_BS_LOG_ID);
    
        Optional.ofNullable(pid)
                .ifPresentOrElse(this::initBillingLog, () -> {
                    throw new RuntimeException();
                });
    
        // Saving in a step context some parameters that will be used in continue
        customRoutingDS.setCurrentGatewayId(gatewayId);
        stepExecution.getJobExecution().getExecutionContext().put("ashVersion", ashraitCfgRepository.getAshVersion());
        stepExecution.getJobExecution().getExecutionContext().put("manufUse", ashraitCfgRepository.getManufUse());
        stepExecution.getJobExecution().getExecutionContext().put("manufId", ashraitCfgRepository.getManufId());
        stepExecution.getJobExecution().getExecutionContext().put("merchantId", merchantId);
        stepExecution.getJobExecution().getExecutionContext().put("GATEWAY_LOG_NUMERATOR",0);
        stepExecution.getJobExecution().getExecutionContext().put("RQUEST_ID",String.format("%s-%s-%s", pid, bsLogId, 0));
        DateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
        stepExecution.getJobExecution().getExecutionContext().put("date", dateFormat.format(new Date()));
     }
    
    
    @Override
    public Transaction process(BillingTransaction item) throws Exception {
    
        Transaction tr;
        try {
            tr = transactionCreator.createTransaction(item, stepExecution);
        } catch (Exception e) {
            logWriter.info(stepExecution, this.getClass().getSimpleName(), e.toString());
            throw new RuntimeException(e);
        }
        setJobExecutionContext(stepExecution.getJobExecution());
    
        return tr;
    }
    

    以下是我们对作者的实现:

    @Component
    public class BillingTransactionWriter implements
        ItemWriter<Transaction>, StepExecutionListener {
    
    private JobExecution jobExecution;
    
    @Autowired
    protected LogWriter logWriter;
    
    @Autowired
    private GatewayLogRepositoryCustom gatewayLogRepositoryCustom;
    
    @Autowired
    private ShovarNumeratorRepository shovarNumeratorRepository;
    
    @Autowired
    private CustomRoutingDS customRoutingDS;
    
    @Autowired
    private EntityManagerDynamicQueryService<String> entityManagerDynamicQueryService;
    
    @Override
    public void beforeStep(@NotNull StepExecution stepExecution) {
        logWriter.info(stepExecution, this.getClass().getSimpleName(), "BillingTransaction writer initialized.");
        jobExecution = stepExecution.getJobExecution();
    
     //   jobExecution.getExecutionContext().put("GATEWAY_LOG_NUMERATOR", 0);
        jobExecution.getExecutionContext().put("INSERT_TOTAL_NUMERATOR", 0);
    
    }
    
    @Override
    public void write(Chunk<? extends Transaction> transactions) {
    
        int incr = Integer.parseInt(Objects.requireNonNull(jobExecution.getExecutionContext().get("GATEWAY_LOG_NUMERATOR")).toString());
        int total = Integer.parseInt(Objects.requireNonNull(jobExecution.getExecutionContext().get("INSERT_TOTAL_NUMERATOR")).toString());
    
        List<Transaction> bts = (List<Transaction>) transactions.getItems();
    
        System.out.printf("Received list of size: %d total insert : %d, GatewayLog Numerator is : %d%n", bts.size(), total, incr);
        total += bts.size();
    
        List<String> transactionIds = insertToGatewayLog(bts);
        insertToTransactions(bts, transactionIds);
    
        jobExecution.getExecutionContext().put("GATEWAY_LOG_NUMERATOR", ++incr);
        jobExecution.getExecutionContext().put("INSERT_TOTAL_NUMERATOR", total);
        String pid = jobExecution.getJobParameters().getString(JOB_PARAMETER_PID);
        String bsLogId = jobExecution.getJobParameters().getString(JOB_PARAMETER_BS_LOG_ID);
        jobExecution.getExecutionContext().put("RQUEST_ID", String.format("%s-%s-%s", pid, bsLogId, incr));
    }
    

    根据我们对1M记录的测试,我们需要为HEAP SPACE消耗约8GB的可用RAM,否则我们会得到以下异常:

    org.springframework.batch.core.step.AbstractStep][taskExecutor-1][ERROR] Encountered an error executing step createTransactions in job batchJob 
    java.lang.OutOfMemoryError: Java heap space
    

    此外,此内存不会在作业完成后释放。

    我们预计,将Spring Batch与带有块的内置读写器一起使用将有效地使用堆内存,并在特定块完成后释放内存。

    0 回复  |  直到 2 年前