简介
之前我们提到过,对于底层的数据源来说,MongoDB, Redis, 和 Cassandra 可以直接以 reactive 的方式支持 Spring Data。而其他很多关系型数据库比如 Postgres, Microsoft SQL Server, MySQL, H2 和 Google Spanner 则可以通过使用 R2DBC 来实现对 reactive 的支持。
今天我们就来具体讲解一下 R2DBC 的使用。
R2DBC 介绍
之前我们介绍了 Reactor 还有基于其之上的 Spring WebFlux 框架。包括 vert.x,rxjava 等等 reactive 技术。我们实际上在应用层已经有很多优秀的响应式处理框架。
但是有一个问题就是所有的框架都需要获取底层的数据,而基本上关系型数据库的底层读写都还是同步的。
为了解决这个问题,出现了两个标准,一个是 oracle 提出的 ADBC (Asynchronous Database Access API),另一个就是 Pivotal 提出的 R2DBC (Reactive Relational Database Connectivity)。
R2DBC 是基于 Reactive Streams 标准来设计的。通过使用 R2DBC,你可以使用 reactive API 来操作数据。
同时 R2DBC 只是一个开放的标准,而各个具体的数据库连接实现,需要实现这个标准。
今天我们以 r2dbc-h2 为例,讲解一下 r2dbc 在 Spring webFlux 中的使用。
项目依赖
我们需要引入 r2dbc-spi 和 r2dbc-h2 两个库,其中 r2dbc-spi 是接口,而 r2dbc-h2 是具体的实现。
同时我们使用了 Spring webflux,所以还需要引入 spring-boot-starter-webflux。
具体的依赖如下:
<!-- R2DBC H2 Driver -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<version>${r2dbc-h2.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
复制代码
创建 ConnectionFactory
ConnectionFactory 是数据库连接的一个具体实现,通过 ConnectionFactory 我们可以创建到数据库的连接。
先看一下数据库的配置文件,为了方便起见,这里我们使用的是内存数据库 H2 :
r2dbc.url=r2dbc:h2:mem://./r2dbc
r2dbc.user=sa
r2dbc.password=password
复制代码
第一个 url 指定的是数据库的连接方式,下面两个是数据库的用户名和密码。
接下来我们看一下,怎么通过这些属性来创建 ConnectionFactory:
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(url);
ConnectionFactoryOptions.Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
if (!StringUtil.isNullOrEmpty(user)) {
ob = ob.option(USER, user);
}
if (!StringUtil.isNullOrEmpty(password)) {
ob = ob.option(PASSWORD, password);
}
return ConnectionFactories.get(ob.build());
}
复制代码
通过 url 可以 parse 得到 ConnectionFactoryOptions。然后通过 ConnectionFactories 的 get 方法创建 ConnectionFactory。
如果我们设置了 USER 或者 PASSWORD,还可以加上这两个配置。
创建 Entity Bean
这里,我们创建一个简单的 User 对象:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Users {
private Long id;
private String firstname;
private String lastname;
}
复制代码
初始化数据库
虽然 H5 有很多更加简单的方式来初始化数据库,比如直接读取 SQL 文件,这里为了说明 R2DBC 的使用,我们使用手动的方式来创建:
@Bean
public CommandLineRunner initDatabase(ConnectionFactory cf) {
return (args) ->
Flux.from(cf.create())
.flatMap(c ->
Flux.from(c.createBatch()
.add("drop table if exists Users")
.add("create table Users(" +
"id IDENTITY(1,1)," +
"firstname varchar(80) not null," +
"lastname varchar(80) not null)")
.add("insert into Users(firstname,lastname)" +
"values('flydean','ma')")
.add("insert into Users(firstname,lastname)" +
"values('jacken','yu')")
.execute())
.doFinally((st) -> c.close())
)
.log()
.blockLast();
}
复制代码
上面的代码中,我们使用 c.createBatch()来向数据库插入一些数据。
除了 createBatch,还可以使用 create 来创建单个的执行语句。
获取所有的用户
在 Dao 中,我们提供了一个 findAll 的方法:
public Flux<Users> findAll() {
return Mono.from(connectionFactory.create())
.flatMap((c) -> Mono.from(c.createStatement("select id,firstname,lastname from users")
.execute())
.doFinally((st) -> close(c)))
.flatMapMany(result -> Flux.from(result.map((row, meta) -> {
Users acc = new Users();
acc.setId(row.get("id", Long.class));
acc.setFirstname(row.get("firstname", String.class));
acc.setLastname(row.get("lastname", String.class));
return acc;
})));
}
复制代码
简单解释一下上面的使用。
因为是一个 findAll 方法,我们需要找出所有的用户信息。所以我们返回的是一个 Flux 而不是一个 Mono。
怎么从 Mono 转换成为一个 Flux 呢?
这里我们使用的是 flatMapMany,将 select 出来的结果,分成一行一行的,最后转换成为 Flux。
Prepare Statement
为了防止 SQL 注入,我们需要在 SQL 中使用 Prepare statement:
public Mono<Users> findById(long id) {
return Mono.from(connectionFactory.create())
.flatMap(c -> Mono.from(c.createStatement("select id,firstname,lastname from Users where id = $1")
.bind("$1", id)
.execute())
.doFinally((st) -> close(c)))
.map(result -> result.map((row, meta) ->
new Users(row.get("id", Long.class),
row.get("firstname", String.class),
row.get("lastname", String.class))))
.flatMap( p -> Mono.from(p));
}
复制代码
看下我们是怎么在 R2DBC 中使用 prepare statement 的。
事务处理
接下来我们看一下怎么在 R2DBC 中使用事务:
public Mono<Users> createAccount(Users account) {
return Mono.from(connectionFactory.create())
.flatMap(c -> Mono.from(c.beginTransaction())
.then(Mono.from(c.createStatement("insert into Users(firstname,lastname) values($1,$2)")
.bind("$1", account.getFirstname())
.bind("$2", account.getLastname())
.returnGeneratedValues("id")
.execute()))
.map(result -> result.map((row, meta) ->
new Users(row.get("id", Long.class),
account.getFirstname(),
account.getLastname())))
.flatMap(pub -> Mono.from(pub))
.delayUntil(r -> c.commitTransaction())
.doFinally((st) -> c.close()));
}
复制代码
上面的代码中,我们使用了事务,具体的代码有两部分:
c -> Mono.from(c.beginTransaction())
.delayUntil(r -> c.commitTransaction())
复制代码
开启是的时候需要使用 beginTransaction,后面提交就需要调用 commitTransaction。
WebFlux 使用
最后,我们需要创建 WebFlux 应用来对外提供服务:
@GetMapping("/users/{id}")
public Mono<ResponseEntity<Users>> getUsers(@PathVariable("id") Long id) {
return usersDao.findById(id)
.map(acc -> new ResponseEntity<>(acc, HttpStatus.OK))
.switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND)));
}
@GetMapping("/users")
public Flux<Users> getAllAccounts() {
return usersDao.findAll();
}
@PostMapping("/createUser")
public Mono<ResponseEntity<Users>> createUser(@RequestBody Users user) {
return usersDao.createAccount(user)
.map(acc -> new ResponseEntity<>(acc, HttpStatus.CREATED))
.log();
}
复制代码
执行效果
最后,我们运行一下代码,执行下 users:
curl "localhost:8080/users"
[{"id":1,"firstname":"flydean","lastname":"ma"},{"id":2,"firstname":"jacken","lastname":"yu"}]%
复制代码
完美,实验成功。
本文的代码:webflux-with-r2dbc
本文作者:flydean 程序那些事
本文链接:http://www.flydean.com/r2dbc-introduce/
本文来源:flydean 的博客
欢迎关注我的公众号:「程序那些事」最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!
评论 (3 条评论)