代码之家  ›  专栏  ›  技术社区  ›  pixel

Spring批处理中基于异常的简单块作业执行控制

  •  2
  • pixel  · 技术社区  · 6 年前

    我正在做一个简单的数据块处理工作。

    当处理过程中出现特定类型的错误(例如无效的行结构)时,我想更改执行流

    为了防止抛出错误,我需要提供自定义 exceptionHandler 将包含解析异常:

    @Bean
    fun processCsvStep(
        stepBuilderFactory: StepBuilderFactory,
        reader: ItemReader<InputRow>,
        processor: ItemProcessor<InputRow, OutputObject>,
        writer: ItemWriter<OutputObject>
    ) = stepBuilderFactory.get(PROCESS_CSV_STEP)
        .chunk<InputRow, OutputObject>(
            CHUNKS_NUMBER
        )
        .reader(reader)
        .processor(processor)
        .writer(writer)
        .exceptionHandler { context: RepeatContext, throwable: Throwable ->
            context.setTerminateOnly()
            logger.error { "Exception during parsing: ${throwable.message}" }
        }
        .build()!!
    

    那么在我的工作中,我只能依靠回滚计数:

    @Bean
    fun createCsvJob(jobs: JobBuilderFactory, processCsvStep: Step, moveCsvStep: Step, moveFailedCsvStep: Step) = jobs.get(PROCESS_CSV_JOB)
        .start(processCsvStep)
        .next { jobExecution: JobExecution, stepExecution: StepExecution ->
            return@next when (stepExecution.rollbackCount) {
                0 -> FlowExecutionStatus.COMPLETED
                else -> FlowExecutionStatus.FAILED
            }
    
        }
        .on(FlowExecutionStatus.FAILED.name)
        .to(moveFailedCsvStep)
        .on(FlowExecutionStatus.COMPLETED.name)
        .to(moveCsvStep)
        .end()
        .build()!!
    

    JobExecutionDecider

    1 回复  |  直到 6 年前
        1
  •  1
  •   Mahmoud Ben Hassine    6 年前

    我想根据解析过程中发生的异常类型做出执行决策。这可能吗?

    您可以访问从决策者到的步骤中发生的异常 stepExecution#getFailureExceptions . 举个例子:

    import java.util.Arrays;
    import java.util.List;
    
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.job.flow.FlowExecutionStatus;
    import org.springframework.batch.core.job.flow.JobExecutionDecider;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @EnableBatchProcessing
    public class MyJob {
    
        @Autowired
        private JobBuilderFactory jobs;
    
        @Autowired
        private StepBuilderFactory steps;
    
        @Bean
        public ItemReader<Integer> itemReader() {
            return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
        }
    
        @Bean
        public ItemWriter<Integer> itemWriter() {
            return items -> {
                for (Integer item : items) {
                    if (items.contains(3)) {
                        throw new IllegalArgumentException("no 3!");
                    }
                    System.out.println("item = " + item);
                }
            };
        }
    
        @Bean
        public Step step1() {
            return steps.get("step1")
                    .<Integer, Integer>chunk(5)
                    .reader(itemReader())
                    .writer(itemWriter())
                    .build();
        }
    
        @Bean
        public Step step2() {
            return steps.get("step2")
                    .tasklet((contribution, chunkContext) -> {
                        System.out.println("step2");
                        return RepeatStatus.FINISHED;
                    })
                    .build();
        }
    
        @Bean
        public Step step3() {
            return steps.get("step3")
                    .tasklet((contribution, chunkContext) -> {
                        System.out.println("step3");
                        return RepeatStatus.FINISHED;
                    })
                    .build();
        }
    
        @Bean
        public JobExecutionDecider decider() {
            return (jobExecution, stepExecution) -> {
                int rollbackCount = stepExecution.getRollbackCount();
                List<Throwable> failureExceptions = stepExecution.getFailureExceptions();
                System.out.println("rollbackCount = " + rollbackCount);
                System.out.println("failureExceptions = " + failureExceptions);
                // make the decision based on rollbackCount and/or failureExceptions and return status accordingly
                return FlowExecutionStatus.COMPLETED;
            };
        }
    
        @Bean
        public Job job() {
            return jobs.get("job")
                    .start(step1())
                    .on("*").to(decider())
                    .from(decider()).on("COMPLETED").to(step2())
                    .from(decider()).on("FAILED").to(step3())
                    .build()
                    .build();
        }
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
            JobLauncher jobLauncher = context.getBean(JobLauncher.class);
            Job job = context.getBean(Job.class);
            jobLauncher.run(job, new JobParameters());
        }
    
    }
    

    step1 ,决策者可以从步骤执行中获得它并相应地做出决策(转到 step2 step3

    所以我不确定您是否真的需要一个异常处理程序和一种将信息传递给决策者的方法。同样的想法也适用于你想根据 rollbackCount , commitCount readCount ,或任何其他指标。

    希望这有帮助。