@Configuration
public class ParallelJobConfiguration extends BaseJobConfiguration {
@Bean
@StepScope
public FlatFileItemReader<Transaction> fileTransactionReader() {
Resource resource = new FileSystemResource("data/csv/bigtransactions.csv");
return new FlatFileItemReaderBuilder<Transaction>()
.saveState(false)
.resource(resource)
.delimited()
.names(new String[]{"account", "amount", "timestamp"})
.fieldSetMapper(fieldSet -> {
Transaction transaction = new Transaction();
transaction.setAccount(fieldSet.readString("account"));
transaction.setAmount(fieldSet.readBigDecimal("amount"));
transaction.setTimestamp(fieldSet.readDate("timestamp", "yyyy-MM-dd HH:mm:ss"));
return transaction;
})
.build();
}
@Bean
@StepScope
public StaxEventItemReader<Transaction> xmlTransactionReader() {
Resource resource = new FileSystemResource("data/xml/bigtransactions.xml");
Map<String, Class> map = new HashMap<>();
map.put("transaction", Transaction.class);
map.put("account", String.class);
map.put("amount", BigDecimal.class);
map.put("timestamp", Date.class);
XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(map);
String[] formats = {"yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd"};
marshaller.setConverters(new DateConverter("yyyy-MM-dd HH:mm:ss", formats));
return new StaxEventItemReaderBuilder<Transaction>()
.name("xmlFileTransactionReader")
.resource(resource)
.addFragmentRootElements("transaction")
.unmarshaller(marshaller)
.build();
}
@Bean
@StepScope
public JdbcBatchItemWriter<Transaction> jdbcBatchItemWriter(@Qualifier("dataSource") DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Transaction>()
.dataSource(dataSource)
.beanMapped()
.sql("INSERT INTO TRANSACTION (ACCOUNT, AMOUNT, TIMESTAMP) VALUES (:account, :amount, :timestamp)")
.build();
}
@Bean("parallelJob")
public Job parallelStepsJob() {
return this.jobs.get("parallelJob")
.start(parallelFlow())
.end()
.build();
}
@Bean
public Flow parallelFlow() {
return new FlowBuilder<Flow>("parallelFlow")
.split(new SimpleAsyncTaskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<Flow>("flow1")
.start(step1())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<Flow>("flow2")
.start(step2())
.build();
}
@Bean("xmlStep")
public Step step1() {
return this.steps.get("xmlStep")
.<Transaction, Transaction>chunk(1000)
.reader(xmlTransactionReader())
.writer(jdbcBatchItemWriter(null))
.build();
}
@Bean("fileStep")
public Step step2() {
return this.steps.get("fileStep")
.<Transaction, Transaction>chunk(1000)
.reader(fileTransactionReader())
.writer(jdbcBatchItemWriter(null))
.build();
}
评论