springboot3+r2dbc——响应式编程实践
- 2022 年 2 月 17 日
本文字数:7552 字
阅读完需:约 25 分钟
Spring boot3
已经M1
了,最近群佬们也开始蠢蠢欲动的开始整活Reactive
+Spring Boot3
,跟着大家的步伐,我也来整一篇工程入门,我们将用java17
+Spring Boot3
+r2dbc
+Reactive
栈来讲述,欢迎大家来讨论。(关于响应式,请大家异步到之前的文章里,有详细介绍。)
r2dbc
Reactor
还有基于其之上的Spring WebFlux
框架。包括vert.x
,rxjava
等等reactive
技术。我们实际上在应用层已经有很多优秀的响应式处理框架。
但是有一个问题就是所有的框架都需要获取底层的数据,而基本上关系型数据库的底层读写都还是同步的。
为了解决这个问题,出现了两个标准,一个是oracle
提出的 ADBC
(Asynchronous Database Access API),另一个就是Pivotal
提出的R2DBC
(Reactive Relational Database Connectivity)。
R2DBC
是基于Reactive Streams
标准来设计的。通过使用R2DBC
,你可以使用reactive API
来操作数据。
同时R2DBC
只是一个开放的标准,而各个具体的数据库连接实现,需要实现这个标准。
今天我们以r2dbc-h2
为例,讲解一下r2dbc
在Spring webFlux
中的使用。
工程依赖
以下是 pom.xml
清单
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.0-M1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>wang.datahub</groupId>
<artifactId>springboot3demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot3demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-rest</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-groovy-templates</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hateoas</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<!-- <version>3.4.14</version>-->
<!-- <scope>compile</scope>-->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<releases>
<enabled>false</enabled>
</releases>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
</pluginRepositories>
</project>
配置文件
这里我们只配置了 r2dbc 链接信息
r2dbc:
url: r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
配置类
用于配置默认链接,创建初始化数据
package wang.datahub.springboot3demo.config;
import io.netty.util.internal.StringUtil;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
import static io.r2dbc.spi.ConnectionFactoryOptions.*;
@Configuration
@ConfigurationProperties(prefix = "r2dbc")
public class DBConfig {
private String url;
private String user;
private String password;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Bean
public ConnectionFactory connectionFactory() {
System.out.println("url ==> "+url);
ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(url);
ConnectionFactoryOptions.Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
if (!StringUtil.isNullOrEmpty(user)) {
ob = ob.option(USER, user);
}
if (!StringUtil.isNullOrEmpty(password)) {
ob = ob.option(PASSWORD, password);
}
return ConnectionFactories.get(ob.build());
}
@Bean
public CommandLineRunner initDatabase(ConnectionFactory cf) {
return (args) ->
Flux.from(cf.create())
.flatMap(c ->
Flux.from(c.createBatch()
.add("drop table if exists Users")
.add("create table Users(" +
"id IDENTITY(1,1)," +
"firstname varchar(80) not null," +
"lastname varchar(80) not null)")
.add("insert into Users(firstname,lastname)" +
"values('Jacky','Li')")
.add("insert into Users(firstname,lastname)" +
"values('Doudou','Li')")
.add("insert into Users(firstname,lastname)" +
"values('Maimai','Li')")
.execute())
.doFinally((st) -> c.close())
)
.log()
.blockLast();
}
}
bean
创建用户 bean
package wang.datahub.springboot3demo.bean;
import org.springframework.data.annotation.Id;
public class Users {
@Id
private Long id;
private String firstname;
private String lastname;
public Users(){
}
public Users(Long id, String firstname, String lastname) {
this.id = id;
this.firstname = firstname;
this.lastname = lastname;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getFirstname() {
return firstname;
}
public void setFirstname(String firstname) {
this.firstname = firstname;
}
public String getLastname() {
return lastname;
}
public void setLastname(String lastname) {
this.lastname = lastname;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", firstname='" + firstname + '\'' +
", lastname='" + lastname + '\'' +
'}';
}
}
DAO
dao 代码清单如下,包含查询列表、按 id 查询,以及创建用户等操作
package wang.datahub.springboot3demo.dao;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.data.relational.core.query.Query;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import wang.datahub.springboot3demo.bean.Users;
import static org.springframework.data.r2dbc.query.Criteria.where;
import static org.springframework.data.relational.core.query.Query.query;
@Component
public class UsersDao {
private ConnectionFactory connectionFactory;
private R2dbcEntityTemplate template;
public UsersDao(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
this.template = new R2dbcEntityTemplate(connectionFactory);
}
public Mono<Users> findById(long id) {
return this.template.selectOne(query(where("id").is(id)),Users.class);
// return Mono.from(connectionFactory.create())
// .flatMap(c -> Mono.from(c.createStatement("select id,firstname,lastname from Users where id = $1")
// .bind("$1", id)
// .execute())
// .doFinally((st) -> close(c)))
// .map(result -> result.map((row, meta) ->
// new Users(row.get("id", Long.class),
// row.get("firstname", String.class),
// row.get("lastname", String.class))))
// .flatMap( p -> Mono.from(p));
}
public Flux<Users> findAll() {
return this.template.select(Users.class).all();
// return Mono.from(connectionFactory.create())
// .flatMap((c) -> Mono.from(c.createStatement("select id,firstname,lastname from users")
// .execute())
// .doFinally((st) -> close(c)))
// .flatMapMany(result -> Flux.from(result.map((row, meta) -> {
// Users acc = new Users();
// acc.setId(row.get("id", Long.class));
// acc.setFirstname(row.get("firstname", String.class));
// acc.setLastname(row.get("lastname", String.class));
// return acc;
// })));
}
public Mono<Users> createAccount(Users account) {
return Mono.from(connectionFactory.create())
.flatMap(c -> Mono.from(c.beginTransaction())
.then(Mono.from(c.createStatement("insert into Users(firstname,lastname) values($1,$2)")
.bind("$1", account.getFirstname())
.bind("$2", account.getLastname())
.returnGeneratedValues("id")
.execute()))
.map(result -> result.map((row, meta) ->
new Users(row.get("id", Long.class),
account.getFirstname(),
account.getLastname())))
.flatMap(pub -> Mono.from(pub))
.delayUntil(r -> c.commitTransaction())
.doFinally((st) -> c.close()));
}
private <T> Mono<T> close(Connection connection) {
return Mono.from(connection.close())
.then(Mono.empty());
}
}
controller
controller 代码清单如下,包含了查询列表、按 id 查询,以及创建用户等操作
package wang.datahub.springboot3demo.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import wang.datahub.springboot3demo.bean.Users;
import wang.datahub.springboot3demo.dao.UsersDao;
@RestController
public class UsersController {
@Autowired
private final UsersDao usersDao;
public UsersController(UsersDao usersDao) {
this.usersDao = usersDao;
}
@GetMapping("/users/{id}")
public Mono<ResponseEntity<Users>> getUsers(@PathVariable("id") Long id) {
return usersDao.findById(id)
.map(acc -> new ResponseEntity<>(acc, HttpStatus.OK))
.switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND)));
}
@GetMapping("/users")
public Flux<Users> getAllAccounts() {
return usersDao.findAll();
}
@PostMapping("/createUser")
public Mono<ResponseEntity<Users>> createUser(@RequestBody Users user) {
return usersDao.createAccount(user)
.map(acc -> new ResponseEntity<>(acc, HttpStatus.CREATED))
.log();
}
}
启动类清单:
package wang.datahub.springboot3demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import wang.datahub.springboot3demo.config.DBConfig;
@SpringBootApplication
@EnableConfigurationProperties(DBConfig.class)
public class WebFluxR2dbcApp {
public static void main(String[] args) {
SpringApplication.run(WebFluxR2dbcApp.class, args);
}
}
好了,致此我们整个 Demo
就实现完成了
参考链接:
麒思妙想
欢迎关注同名公众号,麒思妙想 2018.04.27 加入
还未添加个人简介
评论