Partitioner
class to efficiently parallelize your jobs, split the workload smartly, and speed up I/O-bound operations like database access.
pom.xml
includes the necessary dependencies:
pom.xml
in the project repository.
application.yml
(or application.properties
) with the following settings:
spring:
application:
name: PartitionerTest
batch:
jdbc:
initialize-schema: always
datasource:
url: "jdbc:postgresql://localhost:5432/postgres"
username: "postgres"
password: "postgres"
jpa:
hibernate:
ddl-auto: update
Here's what we're telling Spring Boot:
spring.application.name
: the name of our applicationspring.batch.jdbc.initialize-schema: always
: to initialize Spring Batch tables (used to store job and step execution metadata)spring.datasource.*
: configures the database connectionspring.jpa.hibernate.ddl-auto: update
: to align the database schema schema with our JPA entity definitions
@Setter
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(
name = "stock",
uniqueConstraints = {@UniqueConstraint(columnNames = {"ticker", "date"})},
indexes = {@Index(columnList = "ticker")}
)
public class StockEntity {
@Id
@GeneratedValue(strategy = GenerationType.SEQUENCE)
private Long id;
@Column(name = "ticker", length = 10, nullable = false)
private String ticker;
@Column(name = "date", nullable = false)
private LocalDate date;
@Column(name = "low", nullable = false, precision = 50, scale = 20)
private BigDecimal low;
@Column(name = "open", nullable = false, precision = 50, scale = 20)
private BigDecimal open;
@Column(name = "volume", nullable = false, precision = 50)
private BigInteger volume;
@Column(name = "high", nullable = false, precision = 50, scale = 20)
private BigDecimal high;
@Column(name = "close", nullable = false, precision = 50, scale = 20)
private BigDecimal close;
@Column(name = "adjusted_close", nullable = false, precision = 50, scale = 20)
private BigDecimal adjustedClose;
}
A few notes:
ticker
and date
is enforced as a unique constraint to avoid duplicate entries for the same stock on the same dayFileLoaderFlow
, available in the repository) to parse and load the CSV files into the stock
table.
I'll skip the details of this import step, as it's not the focus of this article.
stock
table, we want to compute the mean price, defined as the average between the high
and low
prices.
JpaPagingItemReader
that selects only the necessary fields, projecting them into a DTO:
@Bean
@StepScope
public JpaPagingItemReader<StockMeanDTO> itemReader(
EntityManagerFactory entityManagerFactory
) {
return new JpaPagingItemReaderBuilder<StockMeanDTO>()
.name("meanReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT new com.davide99.partitionertest.mean.StockMeanDTO(e.id, e.high, e.low) " +
"FROM StockEntity e ORDER BY e.id")
.pageSize(PAGE_SIZE)
.build();
}
Where StockMeanDTO
looks like this:
@Getter
public class StockMeanDTO {
private final Long id;
private final BigDecimal mean;
public StockMeanDTO(Long id, BigDecimal high, BigDecimal low) {
this.id = id;
this.mean = high.add(low)
.divide(BigDecimal.valueOf(2), RoundingMode.HALF_UP);
}
}
mean
field to the entity
@Column(name = "mean", nullable = false, precision = 50, scale = 20)
@ColumnDefault("0")
private BigDecimal mean = BigDecimal.ZERO;
public interface StockRepository extends JpaRepository<StockEntity, Long> {
@Modifying
@Query("UPDATE StockEntity e SET e.mean = :mean WHERE e.id = :id")
void updateMeanById(@Param("mean") BigDecimal mean, @Param("id") Long id);
}
@Bean
public ItemWriter<StockMeanDTO> itemWriter(StockRepository repository) {
return chunk -> chunk.forEach(
dto -> repository.updateMeanById(dto.getMean(), dto.getId())
);
}
With this setup, we now have:
reader
that fetches data and computes the meanwriter
that performs an update for each recordTaskExecutor
, which will run multiple slave steps in parallel. Each slave will process a subset of data defined by a key — in our case, the ticker
.
ticker
is a perfect candidate, as it's already part of our data and defines a natural subset.
@Query("SELECT DISTINCT e.ticker FROM StockEntity e")
Set<String> findAllTickers();
Partitioner
which adds the ticker value to the ExecutionContext
of each partition:
@Component
@RequiredArgsConstructor
@Profile(MeanPartitionedFlow.JOB_NAME)
public class TickerPartitioner implements Partitioner {
private final StockRepository repository;
@Override
@NonNull
public Map<String, ExecutionContext> partition(int gridSize) {
return repository.findAllTickers().stream().collect(Collectors.toMap(
ticker -> ticker,
ticker -> {
final ExecutionContext context = new ExecutionContext();
context.putString("ticker", ticker);
return context;
}
));
}
}
Note: We're ignoring the parameter
. If there are n tickers, we create n partitions. The actual parallelism is controlled by the thread pool (TaskExecutor
) which can run up to m < n tasks concurrently.
@Bean
public Step masterStep(
JobRepository jobRepository,
TickerPartitioner partitioner,
Step slaveStep,
TaskExecutor taskExecutor
) {
return new StepBuilder(MASTER_STEP_NAME, jobRepository)
.partitioner(SLAVE_STEP_NAME, partitioner)
.step(slaveStep)
.taskExecutor(taskExecutor)
.gridSize(1) // Unused in our case
.build();
}
Note: We're ignoring the parameter
. If there are n tickers, we create n partitions. The actual parallelism is controlled by the thread pool (TaskExecutor
) which can run up to m < n tasks concurrently.
@Bean
public Step slaveStep(
ItemReader<StockMeanDTO> reader,
ItemWriter<StockMeanDTO> writer,
PlatformTransactionManager transactionManager,
JobRepository jobRepository
) {
return new StepBuilder(SLAVE_STEP_NAME, jobRepository)
.<StockMeanDTO, StockMeanDTO>chunk(CHUNK_SIZE, transactionManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
@StepScope
public JpaPagingItemReader itemReader(
EntityManagerFactory entityManagerFactory,
@Value("#{stepExecutionContext['ticker']}") String ticker
) {
return new JpaPagingItemReaderBuilder()
.name("meanReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT new com.davide99.partitionertest.mean.StockMeanDTO(e.id, e.high, e.low) " +
"FROM StockEntity e " +
"WHERE e.ticker = :ticker " +
"ORDER BY e.id")
.parameterValues(Map.of("ticker", ticker))
.pageSize(PAGE_SIZE)
.build();
}
@Bean
public TaskExecutor taskExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
return executor;
}
spring:
datasource:
url: "jdbc:postgresql://localhost:5432/postgres"
username: "postgres"
password: "postgres"
hikari:
maximum-pool-size: 30
This ensures that enough connections are available for concurrent processing.
With this approach, Spring Batch becomes a high-performance, modular solution even for very large datasets.
The project repository can be found here.