Skip to main content
Advertisement

8.4 Spring Batch & Quartz Scheduling

This section covers the real-world pattern of using Spring Batch for large-scale data processing with failure retry and execution history, combined with Quartz Scheduler for recurring job scheduling.


1. Spring Batch Core Architecture

Overall Structure

Job
└── Step 1 (Chunk-Oriented)
│ ├── ItemReader → Read data from DB/file/API
│ ├── ItemProcessor → Apply business logic (optional)
│ └── ItemWriter → Write results to DB/file
└── Step 2 (Tasklet)
└── Simple task (file move, notification, etc.)
ComponentRole
JobThe entire batch work unit
StepAn independent execution phase within a Job
ItemReaderReads data one item at a time from a source
ItemProcessorTransforms/validates read data
ItemWriterPersists processed data
JobRepositoryStores execution history and status metadata (DB)
JobLauncherEntry point for running a Job

Adding Dependencies

<!-- build.gradle -->
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-quartz'
runtimeOnly 'com.h2database:h2' // For metadata tables (dev)
runtimeOnly 'org.postgresql:postgresql' // Production DB

2. Chunk-Based Batch — CSV to DB Import Example

A batch job that reads a daily orders.csv file from an external system and saves it to the DB.

Domain Model

// Read DTO
@Getter
@Setter
public class OrderCsvDto {
private String orderId;
private String customerId;
private String productCode;
private int quantity;
private BigDecimal price;
private String orderDate;
}

// Persistence Entity
@Entity
@Table(name = "orders")
@Getter
@NoArgsConstructor
public class Order {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

@Column(unique = true)
private String orderId;
private String customerId;
private String productCode;
private int quantity;
private BigDecimal totalAmount;
private LocalDateTime orderedAt;

public static Order from(OrderCsvDto dto) {
Order order = new Order();
order.orderId = dto.getOrderId();
order.customerId = dto.getCustomerId();
order.productCode = dto.getProductCode();
order.quantity = dto.getQuantity();
order.totalAmount = dto.getPrice().multiply(BigDecimal.valueOf(dto.getQuantity()));
order.orderedAt = LocalDateTime.parse(dto.getOrderDate(),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
return order;
}
}

ItemReader — FlatFileItemReader (CSV)

@Configuration
@RequiredArgsConstructor
public class OrderBatchConfig {

private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final EntityManagerFactory entityManagerFactory;

// ── ItemReader ──────────────────────────────────────────────
@Bean
@StepScope // Bean created at Step execution time (allows JobParameter injection)
public FlatFileItemReader<OrderCsvDto> orderCsvReader(
@Value("#{jobParameters['filePath']}") String filePath) {

return new FlatFileItemReaderBuilder<OrderCsvDto>()
.name("orderCsvReader")
.resource(new FileSystemResource(filePath))
.delimited()
.delimiter(",")
.names("orderId", "customerId", "productCode", "quantity", "price", "orderDate")
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(OrderCsvDto.class);
}})
.linesToSkip(1) // Skip header row
.encoding("UTF-8")
.build();
}

// ── ItemProcessor ───────────────────────────────────────────
@Bean
@StepScope
public ItemProcessor<OrderCsvDto, Order> orderProcessor() {
return dto -> {
// Validation: skip items with quantity <= 0
if (dto.getQuantity() <= 0) {
return null; // Returning null skips this item
}
return Order.from(dto);
};
}

// ── ItemWriter — JPA Batch Save ──────────────────────────────
@Bean
public JpaItemWriter<Order> orderJpaWriter() {
JpaItemWriter<Order> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManagerFactory);
return writer;
}

// ── Step ────────────────────────────────────────────────────
@Bean
public Step orderImportStep() {
return new StepBuilder("orderImportStep", jobRepository)
.<OrderCsvDto, Order>chunk(500, transactionManager) // Commit every 500 items
.reader(orderCsvReader(null))
.processor(orderProcessor())
.writer(orderJpaWriter())
.faultTolerant()
.skip(FlatFileParseException.class) // Skip parse errors
.skipLimit(10) // Allow up to 10 skips
.retry(DataAccessException.class) // Retry on transient DB errors
.retryLimit(3)
.build();
}

// ── Job ─────────────────────────────────────────────────────
@Bean
public Job orderImportJob(Step orderImportStep, JobExecutionListener listener) {
return new JobBuilder("orderImportJob", jobRepository)
.start(orderImportStep)
.listener(listener)
.incrementer(new RunIdIncrementer()) // Create a new JobInstance on each run
.build();
}
}

JobExecutionListener — Pre/Post Processing

@Slf4j
@Component
public class OrderJobListener implements JobExecutionListener {

@Override
public void beforeJob(JobExecution jobExecution) {
log.info("=== Batch Started: {} / Parameters: {}",
jobExecution.getJobInstance().getJobName(),
jobExecution.getJobParameters());
}

@Override
public void afterJob(JobExecution jobExecution) {
BatchStatus status = jobExecution.getStatus();
long readCount = jobExecution.getStepExecutions().stream()
.mapToLong(StepExecution::getReadCount).sum();
long writeCount = jobExecution.getStepExecutions().stream()
.mapToLong(StepExecution::getWriteCount).sum();
long skipCount = jobExecution.getStepExecutions().stream()
.mapToLong(StepExecution::getSkipCount).sum();

log.info("=== Batch Finished: {} | Read: {} | Write: {} | Skip: {} | Status: {}",
jobExecution.getJobInstance().getJobName(),
readCount, writeCount, skipCount, status);

if (status == BatchStatus.FAILED) {
// Send alert, handle failure
log.error("Batch Failed! Cause: {}", jobExecution.getAllFailureExceptions());
}
}
}

3. Tasklet-Based Step — File Cleanup Post-Processing

For simple tasks that don't fit the Chunk model (file moves, directory cleanup, etc.), use a Tasklet.

@Slf4j
@Component
@StepScope
@RequiredArgsConstructor
public class ArchiveFileTasklet implements Tasklet {

@Value("#{jobParameters['filePath']}")
private String filePath;

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
Path source = Path.of(filePath);
Path archive = Path.of(filePath.replace("/incoming/", "/archive/"));

try {
Files.createDirectories(archive.getParent());
Files.move(source, archive, StandardCopyOption.REPLACE_EXISTING);
log.info("File archived successfully: {} → {}", source, archive);
} catch (IOException e) {
throw new RuntimeException("File archiving failed", e);
}

return RepeatStatus.FINISHED; // Task complete
}
}
// Add Step to OrderBatchConfig
@Bean
public Step archiveFileStep(ArchiveFileTasklet archiveFileTasklet) {
return new StepBuilder("archiveFileStep", jobRepository)
.tasklet(archiveFileTasklet, transactionManager)
.build();
}

// Chain Steps in the Job
@Bean
public Job orderImportJob(Step orderImportStep, Step archiveFileStep,
JobExecutionListener listener) {
return new JobBuilder("orderImportJob", jobRepository)
.start(orderImportStep)
.next(archiveFileStep) // Run Step 2 after Step 1 succeeds
.listener(listener)
.incrementer(new RunIdIncrementer())
.build();
}

4. DB to DB Batch — JpaPagingItemReader

Pattern for reading from DB, transforming, and writing back to DB (settlements, aggregations, etc.).

// Monthly order aggregation batch
@Bean
@StepScope
public JpaPagingItemReader<Order> monthlyOrderReader(
@Value("#{jobParameters['targetMonth']}") String targetMonth) {

LocalDateTime start = YearMonth.parse(targetMonth).atDay(1).atStartOfDay();
LocalDateTime end = YearMonth.parse(targetMonth).atEndOfMonth().atTime(23, 59, 59);

Map<String, Object> params = new HashMap<>();
params.put("start", start);
params.put("end", end);

return new JpaPagingItemReaderBuilder<Order>()
.name("monthlyOrderReader")
.entityManagerFactory(entityManagerFactory)
.queryString("""
SELECT o FROM Order o
WHERE o.orderedAt BETWEEN :start AND :end
ORDER BY o.customerId
""")
.parameterValues(params)
.pageSize(1000) // Paginate in batches of 1000
.build();
}

5. Batch Metadata DB Configuration

Spring Batch stores Job execution history in the DB. In production, always use a real DB.

# application.yml
spring:
batch:
job:
enabled: false # Prevent auto-run on startup (Quartz will trigger it)
jdbc:
initialize-schema: always # Auto-create metadata tables (dev)
# initialize-schema: never # Use 'never' in production
datasource:
url: jdbc:postgresql://localhost:5432/batchdb
username: batch_user
password: secret
-- To manually create metadata tables in production DB,
-- run: org/springframework/batch/core/schema-postgresql.sql
-- (included inside the spring-batch-core jar)

6. Quartz Scheduler Integration

What is Quartz?

Quartz is the most widely used enterprise-grade scheduling library in the Java ecosystem. Comparison with @Scheduled:

Feature@ScheduledQuartz
Cluster support❌ Not available✅ DB-based cluster
Execution history❌ None✅ Stored in DB
Dynamic registration❌ Not possible✅ Modifiable at runtime
Misfire on restart❌ Missed✅ Configurable policy
Complex Cron△ Limited✅ Full support

Quartz Configuration

# application.yml
spring:
quartz:
job-store-type: jdbc # Store execution history in DB (required for cluster)
# job-store-type: memory # In-memory (single instance, dev only)
jdbc:
initialize-schema: always # Auto-create Quartz metadata tables
properties:
org.quartz.scheduler.instanceName: BatchScheduler
org.quartz.scheduler.instanceId: AUTO
org.quartz.jobStore.isClustered: true # Enable cluster mode
org.quartz.jobStore.clusterCheckinInterval: 10000
org.quartz.threadPool.threadCount: 5 # Concurrent execution threads

Quartz Job That Runs a Spring Batch Job

/**
* Bridge: Quartz Job → Spring Batch Job execution
* Retrieves JobLauncher and Job beans from ApplicationContext and runs them.
*/
@Slf4j
public class SpringBatchQuartzJob implements Job {

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
// Get Spring bean reference from Quartz JobDataMap
ApplicationContext appContext = (ApplicationContext)
context.getScheduler().getContext().get("applicationContext");

JobLauncher jobLauncher = appContext.getBean(JobLauncher.class);
org.springframework.batch.core.Job batchJob =
appContext.getBean(context.getMergedJobDataMap().getString("jobName"),
org.springframework.batch.core.Job.class);

// Build JobParameters (must be unique for each run)
JobParameters params = new JobParametersBuilder()
.addString("filePath", resolveFilePath())
.addString("targetMonth", YearMonth.now().minusMonths(1).toString())
.addLong("timestamp", System.currentTimeMillis()) // Unique parameter
.toJobParameters();

JobExecution execution = jobLauncher.run(batchJob, params);
log.info("Batch execution complete: {} → {}", batchJob.getName(), execution.getStatus());

} catch (Exception e) {
log.error("Quartz Job execution failed", e);
throw new JobExecutionException(e);
}
}

private String resolveFilePath() {
String today = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
return "/data/incoming/orders_" + today + ".csv";
}
}

Injecting ApplicationContext into Quartz SchedulerContext

@Configuration
public class QuartzConfig {

/**
* Injects the Spring ApplicationContext into the Quartz Scheduler
* so SpringBatchQuartzJob can access Spring beans internally.
*/
@Bean
public SchedulerFactoryBeanCustomizer schedulerFactoryBeanCustomizer(
ApplicationContext applicationContext) {
return schedulerFactoryBean ->
schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");
}
}

Registering JobDetail & Trigger

@Configuration
public class OrderBatchSchedulerConfig {

// ── JobDetail: Defines what to run ────────────────────────────
@Bean
public JobDetail orderImportJobDetail() {
JobDataMap dataMap = new JobDataMap();
dataMap.put("jobName", "orderImportJob");

return JobBuilder.newJob(SpringBatchQuartzJob.class)
.withIdentity("orderImportJobDetail", "batchGroup")
.withDescription("Daily order CSV import batch")
.usingJobData(dataMap)
.storeDurably() // Keep JobDetail even without a Trigger
.build();
}

// ── Trigger: Defines when to run ──────────────────────────────
@Bean
public Trigger orderImportTrigger(JobDetail orderImportJobDetail) {
return TriggerBuilder.newTrigger()
.forJob(orderImportJobDetail)
.withIdentity("orderImportTrigger", "batchGroup")
.withDescription("Run daily at 2 AM")
.withSchedule(
CronScheduleBuilder
.cronSchedule("0 0 2 * * ?") // sec min hr day month weekday
.withMisfireHandlingInstructionFireAndProceed() // Fire once immediately on misfire
)
.build();
}

// ── Monthly settlement batch (1st of each month at 3 AM) ──────
@Bean
public JobDetail monthlySettlementJobDetail() {
JobDataMap dataMap = new JobDataMap();
dataMap.put("jobName", "monthlySettlementJob");

return JobBuilder.newJob(SpringBatchQuartzJob.class)
.withIdentity("monthlySettlementJobDetail", "batchGroup")
.usingJobData(dataMap)
.storeDurably()
.build();
}

@Bean
public Trigger monthlySettlementTrigger(JobDetail monthlySettlementJobDetail) {
return TriggerBuilder.newTrigger()
.forJob(monthlySettlementJobDetail)
.withIdentity("monthlySettlementTrigger", "batchGroup")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0 3 1 * ?"))
.build();
}
}

7. Quartz Cron Expression Quick Reference

┌───── Second (0-59)
│ ┌──── Minute (0-59)
│ │ ┌─── Hour (0-23)
│ │ │ ┌── Day (1-31)
│ │ │ │ ┌─ Month (1-12 or JAN-DEC)
│ │ │ │ │ ┌ Day of week (1-7 or SUN-SAT)
│ │ │ │ │ │
* * * * * *
ExpressionDescription
0 0 2 * * ?Every day at 2 AM
0 30 8 * * MON-FRIWeekdays at 8:30 AM
0 0 3 1 * ?1st of every month at 3 AM
0 0/30 9-18 * * ?Every 30 minutes between 9 AM and 6 PM
0 0 12 ? * SUNEvery Sunday at noon
0 0 0 L * ?Last day of every month at midnight

8. Manual Job Execution (REST API Trigger)

For emergency reprocessing in production, trigger a batch job immediately via API.

@RestController
@RequestMapping("/admin/batch")
@RequiredArgsConstructor
public class BatchAdminController {

private final JobLauncher jobLauncher;
private final Job orderImportJob;
private final Scheduler quartzScheduler;

// Directly execute a Spring Batch Job
@PostMapping("/order-import")
public ResponseEntity<String> runOrderImport(
@RequestParam String filePath) throws Exception {

JobParameters params = new JobParametersBuilder()
.addString("filePath", filePath)
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();

JobExecution execution = jobLauncher.run(orderImportJob, params);
return ResponseEntity.ok("Batch status: " + execution.getStatus());
}

// Immediately fire a Quartz Trigger
@PostMapping("/trigger/{triggerName}")
public ResponseEntity<String> fireTrigger(
@PathVariable String triggerName) throws SchedulerException {

quartzScheduler.triggerJob(
new JobKey(triggerName + "Detail", "batchGroup")
);
return ResponseEntity.ok("Quartz Job fired immediately");
}

// List all Quartz Jobs
@GetMapping("/jobs")
public ResponseEntity<List<Map<String, Object>>> listJobs() throws SchedulerException {
List<Map<String, Object>> result = new ArrayList<>();

for (String groupName : quartzScheduler.getJobGroupNames()) {
for (JobKey jobKey : quartzScheduler.getJobKeys(GroupMatcher.groupEquals(groupName))) {
List<? extends Trigger> triggers = quartzScheduler.getTriggersOfJob(jobKey);
triggers.forEach(trigger -> {
Map<String, Object> info = new LinkedHashMap<>();
info.put("job", jobKey.getName());
info.put("group", jobKey.getGroup());
info.put("nextFire", trigger.getNextFireTime());
info.put("previousFire", trigger.getPreviousFireTime());
result.add(info);
});
}
}
return ResponseEntity.ok(result);
}
}

9. Multi-Thread Partitioning (Parallel Processing at Scale)

For 100M+ records, use partitioning to split data and process in parallel.

@Bean
public Step partitionedOrderStep(Step orderImportStep) {
return new StepBuilder("partitionedOrderStep", jobRepository)
.partitioner("orderImportStep", new RangePartitioner(totalCount, 10)) // 10 partitions
.step(orderImportStep)
.taskExecutor(new SimpleAsyncTaskExecutor()) // Async parallel execution
.gridSize(10)
.build();
}

// Partition range splitter
public class RangePartitioner implements Partitioner {
private final long totalCount;
private final int gridSize;

@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitions = new HashMap<>();
long pageSize = totalCount / gridSize;

for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putLong("minId", i * pageSize + 1);
context.putLong("maxId", (i + 1) * pageSize);
partitions.put("partition_" + i, context);
}
return partitions;
}
}

Pro Tips: Production Batch Checklist

Batch Design Principles

  • Set Chunk size between 500–2,000 based on memory usage and transaction size
  • @StepScope / @JobScope beans are created at Step/Job execution time — thread-safe for multi-threading
  • Always add a timestamp to JobParameters → enables re-running the same Job

Quartz Cluster Operations

  • Setting job-store-type: jdbc allows multiple instances to use the DB to prevent duplicate execution
  • instanceId: AUTO automatically assigns a unique ID to each node
  • Always specify the Misfire policy explicitly — MISFIRE_INSTRUCTION_FIRE_AND_PROCEED is recommended

Monitoring

  • Integrate Spring Boot Actuator + /actuator/metrics for Spring Batch monitoring
  • Implement Slack/email alerts on batch failure in JobExecutionListener.afterJob()
  • Quartz execution history can be queried directly from QRTZ_* tables

Use JdbcTemplate.batchUpdate() for simple DB batch inserts, and the Spring Batch + Quartz combination for complex business batch workflows.

Advertisement