1 year ago

#375859

test-img

dave

Spring Batch Remote Partitioning - DeployerPartitionHandler not executing another job in queue

The DeployerPartitionHandler is configured to use at most 3 workers at a time. When job is launched which is using all 3 workers, and at the same time another job is launched it is not executed even after first job is completed and workers are free. The status of the 2nd job is stuck at STARTING.

Below is the Batch Configuration:

@Configuration
@EnableBatchProcessing
@Slf4j
public class BatchConfig {

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Value("${jarLocation}")
    public String jarLocation;

    @Value("${batch.job.jobname}")
    public String jobName;

    @Bean
    @Primary
    PlatformTransactionManager getTransactionManager(
            @Qualifier("transactionManager") PlatformTransactionManager platform) {
        return platform;
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry);
        return postProcessor;
    }

    @Bean
    @Profile("worker")
    public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer, JobRepository jobRepository,
            ConfigurableApplicationContext context) {
        return new DeployerStepExecutionHandler(context, jobExplorer, jobRepository);
    }

    
    @Bean
    public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
            Environment environment, DelegatingResourceLoader delegatingResourceLoader, TaskRepository taskRepository) {
        Resource resource = delegatingResourceLoader.getResource(jarLocation);

        DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                "workerStep", taskRepository);
        List<String> commandLineArguments = new ArrayList<>(5);
        commandLineArguments.add("--spring.profiles.active=worker");
        commandLineArguments.add("--spring.cloud.task.initialize.enable=false");
        commandLineArguments.add("--spring.batch.initializer.enabled=false");
        commandLineArguments.add("--spring.cloud.task.closecontextEnabled=true");
        commandLineArguments.add("--logging.level=DEBUG");

        partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArguments));
        partitionHandler.setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(environment));
        partitionHandler.setMaxWorkers(3);
        partitionHandler.setApplicationName("BatchApplicationWorker");
        return partitionHandler;
        
    }

    @Bean
    @StepScope
    public Partitioner partitioner(@Value("#{jobParameters['inputFiles']}") String file,
            @Value("#{jobParameters['partitionSize']}") String partitionSize1) {
        int partitionSize = Integer.parseInt(partitionSize1);
        return new Partitioner() {
            public Map<String, ExecutionContext> partition(int gridSize) {
                Map<String, ExecutionContext> partitions = new HashMap<>();
                String[] ids = fetchAllPrimaryKeys(file);
                List<List<String>> partitionPayloads = splitPayLoad(ids, partitionSize);
                int size = partitionPayloads.size();
                for (int i = 0; i < size; i++) {
                    ExecutionContext executionContext = new ExecutionContext();
                    executionContext.put("partitionNumber", i);
                    executionContext.put("partitionPayLoad", new ArrayList<>(partitionPayloads.get(i)));
                    partitions.put("partition" + i, executionContext);
                }
                return partitions;
            }
        };
    }

    @Bean
    public Step masterStep(Step workerStep, PartitionHandler partitionHandler) {
        return this.stepBuilderFactory.get("masterStep").partitioner(workerStep.getName(), partitioner(null, null))
                .step(workerStep).partitionHandler(partitionHandler).build();
    }

    @Bean
    public Step workerStep(CustomWriter customWriter, CustomProcessor customProcessor) {
        return this.stepBuilderFactory.get("workerStep").<User, User>chunk(10000).reader(reader(null))
                .processor(customProcessor).writer(customWriter).build();
    }

    @Bean
    public Job batchJob(Step masterStep, JobExecutionListnerClass jobExecutionListnerClass,
            JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory.get("batchJob").start(masterStep).listener(jobExecutionListnerClass).build();
    }

    @Bean
    @StepScope
    public CustomReader reader(@Value("#{stepExecutionContext['partitionPayLoad']}") List<String> payload) {
        return new CustomReader(payload);
    }

    @Bean
    public AppTaskListener appTaskListener() {
        return new AppTaskListener();
    }

}

spring-boot

spring-batch

0 Answers

Your Answer

Accepted video resources