代码之家  ›  专栏  ›  技术社区  ›  Pandit Biradar

内存中的Spring批处理(MapJobRepositoryFactoryBean)清除旧作业而不是正在运行的作业

  •  0
  • Pandit Biradar  · 技术社区  · 5 年前

    我正在使用spring batch将批处理作业ie作为项目特定需求(即不在生产环境中,它只用于测试环境)安排在内存中,下面是我的配置类

    // Batch Scheulder class
        package org.learning.scheduler
        import org.springframework.batch.core.explore.JobExplorer;
        import org.springframework.batch.core.explore.support.SimpleJobExplorer;
        import org.springframework.batch.core.launch.support.SimpleJobLauncher;
        import org.springframework.batch.core.repository.JobRepository;
        import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
        import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.core.task.SimpleAsyncTaskExecutor;
        import org.springframework.scheduling.annotation.EnableScheduling;
    
        /**
         * Job Inmemory Config
         * 
         */
        @EnableScheduling
        @Configuration
        public class InmemoryJobConfig  {
    
    
            @Bean
            public ResourcelessTransactionManager transactionManager() {
                return new ResourcelessTransactionManager();
            }
    
            @Bean
            public MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean(ResourcelessTransactionManager resourcelessTransactionManager) throws Exception {
                MapJobRepositoryFactoryBean factoryBean = new MapJobRepositoryFactoryBean(resourcelessTransactionManager);
                factoryBean.afterPropertiesSet();
                return factoryBean;
            }
    
            @Bean
            public JobRepository jobRepository(MapJobRepositoryFactoryBean factoryBean) throws Exception{
                return (JobRepository) factoryBean.getObject();
            }
            @Bean
            public JobExplorer jobExplorer(MapJobRepositoryFactoryBean repositoryFactory) {
                return new SimpleJobExplorer(repositoryFactory.getJobInstanceDao(), repositoryFactory.getJobExecutionDao(),
                        repositoryFactory.getStepExecutionDao(), repositoryFactory.getExecutionContextDao());
            }
    
            @Bean
            public SimpleJobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
                SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
                simpleJobLauncher.setJobRepository(jobRepository);
                simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    
                return simpleJobLauncher;
            }
        }
    
    //Job ConfiguratinClass
    
    /**
     * Batch Entry Point for Scheduler for all Jobs
     *
     * 
     */
    @Import({InmemoryJobConfig.class})
    @EnableBatchProcessing
    @Configuration
    @Slf4j
    public class BatchScheduler {
    
    
        @Autowired
        private JobBuilderFactory jobs;
    
        @Autowired
        private StepBuilderFactory steps;
    
        @Autowired
        private SimpleJobLauncher jobLauncher;
    
    
        @Autowired
        private JobExplorer jobExplorer;
    
        @Autowired
        private MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean;
    
    
        @Bean
        public ItemReader<UserDTO> userReader() {
            return new UserReader();
    
        }
    
        @Bean
        public ItemWriter<User> userWriter() {
            return new UserWriter();
    
        }
    
        @Bean
        public ItemReader<OrderDTO> orderReader() {
            return new OrderReader();
        }
    
        @Bean
        public ItemWriter<Order> orderWriter() {
            return new OrderWriter();
        }
    
        @Bean
        public Step userStep(ItemReader<UserDTO> reader, ItemWriter<User> writer) {
            return steps.get("userStep")
                    .<UserDTO, User>chunk(20)
                    .reader(userReader())
                    .processor(new UserProcessor())
                    .writer(userWriter())
                    .build();
        }
    
        @Bean
        public Step orderStep(ItemReader<OrderDTO> reader, ItemWriter<Order> writer) {
            return steps.get("orderStep")
                    .<OrderDTO, Order>chunk(20)
                    .reader(orderReader())
                    .processor(new OrderProcessor())
                    .writer(orderWriter())
                    .build();
        }
    
    
        @Bean
        public Job userJob() {
            return jobs.get("userJob").incrementer(new RunIdIncrementer()).start(userStep(userReader(), userWriter())).build();
        }
    
        @Bean
        public Job orderJob() {
            return jobs.get("orderJob").incrementer(new RunIdIncrementer()).start(orderStep(orderReader(), orderWriter())).build();
        }
    
    
        @Scheduled(cron = "0 0/15 * * *  ?")
        public void scheduleUserJob() throws JobExecutionException {
            Set<JobExecution> runningJob = jobExplorer.findRunningJobExecutions("userJob");
    
            if (!runningJob.isEmpty()) {
                throw new JobExecutionException(" User Job  is already in Start State  ");
            }
    
            JobParameters userParam =
                    new JobParametersBuilder().addLong("date", System.currentTimeMillis())
                            .toJobParameters();
            jobLauncher.run(userJob(), userParam);
    
        }
    
        @Scheduled(cron = "0 0/15 * * *  ?")
        public void scheduleOrderJob() throws JobExecutionException {
            Set<JobExecution> runningJob = jobExplorer.findRunningJobExecutions("orderJob");
    
            if (!runningJob.isEmpty()) {
                throw new JobExecutionException(" Order Job  is already in Start State  ");
            }
    
            JobParameters orderParam =
                    new JobParametersBuilder().addLong("date", System.currentTimeMillis())
                            .toJobParameters();
            jobLauncher.run(orderJob(), orderParam);
    
        }
    
        @Scheduled(cron = "0 0/30 * * *  ?")
        public void scheduleCleanupMemoryJob() throws BatchException {
            Set<JobExecution> orderRunningJob = jobExplorer.findRunningJobExecutions("orderJob");
            Set<JobExecution> userRunningJob = jobExplorer.findRunningJobExecutions("userJob");
            if (!orderRunningJob.isEmpty() || !userRunningJob.isEmpty()) {
                throw new BatchException(" Order/user Job  is running state , cleanup job is aborted  ");
            }
    
            mapJobRepositoryFactoryBean.clear();
    
        }
    }
    

    我每0/15分钟安排两个作业,这将执行一些业务逻辑,并且我已经安排了内存清理作业,仅当这两个作业中的任何一个不是运行状态时,才从“mapJobRepositoryFactoryBean”bean中清理内存中的作业数据。

    我想建议找到删除已经执行的旧作业的最佳方法,如果其任何作业处于运行状态,则上述方法不会删除旧作业的详细信息。

    或者,在spring批处理中是否有任何API,以便在执行作业后从内存中清除特定的作业详细信息。?ie按JobId清除内存

    注意 :我想和 MapJobRepositoryFactoryBean 仅限于永久数据库或任何嵌入式数据库(H2)

    0 回复  |  直到 5 年前
        1
  •  1
  •   Mahmoud Ben Hassine    5 年前

    这个 MapJobRepository clear() 方法,该方法清除基于映射的作业存储库中的所有数据,但我看不到任何明显的方法来删除特定作业的元数据。

    我只想使用MapJobRepositoryFactoryBean,而不是持久数据库或任何嵌入式数据库(H2)