1 year ago
#375859
dave
Spring Batch Remote Partitioning - DeployerPartitionHandler not executing another job in queue
The DeployerPartitionHandler is configured to use at most 3 workers at a time. When job is launched which is using all 3 workers, and at the same time another job is launched it is not executed even after first job is completed and workers are free. The status of the 2nd job is stuck at STARTING.
Below is the Batch Configuration:
@Configuration
@EnableBatchProcessing
@Slf4j
public class BatchConfig {
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Value("${jarLocation}")
public String jarLocation;
@Value("${batch.job.jobname}")
public String jobName;
@Bean
@Primary
PlatformTransactionManager getTransactionManager(
@Qualifier("transactionManager") PlatformTransactionManager platform) {
return platform;
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
@Bean
@Profile("worker")
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer, JobRepository jobRepository,
ConfigurableApplicationContext context) {
return new DeployerStepExecutionHandler(context, jobExplorer, jobRepository);
}
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
Environment environment, DelegatingResourceLoader delegatingResourceLoader, TaskRepository taskRepository) {
Resource resource = delegatingResourceLoader.getResource(jarLocation);
DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
"workerStep", taskRepository);
List<String> commandLineArguments = new ArrayList<>(5);
commandLineArguments.add("--spring.profiles.active=worker");
commandLineArguments.add("--spring.cloud.task.initialize.enable=false");
commandLineArguments.add("--spring.batch.initializer.enabled=false");
commandLineArguments.add("--spring.cloud.task.closecontextEnabled=true");
commandLineArguments.add("--logging.level=DEBUG");
partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArguments));
partitionHandler.setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(environment));
partitionHandler.setMaxWorkers(3);
partitionHandler.setApplicationName("BatchApplicationWorker");
return partitionHandler;
}
@Bean
@StepScope
public Partitioner partitioner(@Value("#{jobParameters['inputFiles']}") String file,
@Value("#{jobParameters['partitionSize']}") String partitionSize1) {
int partitionSize = Integer.parseInt(partitionSize1);
return new Partitioner() {
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitions = new HashMap<>();
String[] ids = fetchAllPrimaryKeys(file);
List<List<String>> partitionPayloads = splitPayLoad(ids, partitionSize);
int size = partitionPayloads.size();
for (int i = 0; i < size; i++) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.put("partitionNumber", i);
executionContext.put("partitionPayLoad", new ArrayList<>(partitionPayloads.get(i)));
partitions.put("partition" + i, executionContext);
}
return partitions;
}
};
}
@Bean
public Step masterStep(Step workerStep, PartitionHandler partitionHandler) {
return this.stepBuilderFactory.get("masterStep").partitioner(workerStep.getName(), partitioner(null, null))
.step(workerStep).partitionHandler(partitionHandler).build();
}
@Bean
public Step workerStep(CustomWriter customWriter, CustomProcessor customProcessor) {
return this.stepBuilderFactory.get("workerStep").<User, User>chunk(10000).reader(reader(null))
.processor(customProcessor).writer(customWriter).build();
}
@Bean
public Job batchJob(Step masterStep, JobExecutionListnerClass jobExecutionListnerClass,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("batchJob").start(masterStep).listener(jobExecutionListnerClass).build();
}
@Bean
@StepScope
public CustomReader reader(@Value("#{stepExecutionContext['partitionPayLoad']}") List<String> payload) {
return new CustomReader(payload);
}
@Bean
public AppTaskListener appTaskListener() {
return new AppTaskListener();
}
}
spring-boot
spring-batch
0 Answers
Your Answer