前言
不知道大家看到这张图感觉怎么样,不是难,一共也没有几个组件,但是真的让我想当头疼,因为在面试的时候,就这张图,对,你没看错,就这几个组件,那是让我相当难受啊
MyBatis中SQL执行的整体过程
在 SqlSession 中,会将执行 SQL 的过程交由Executor执行器去执行,过程大致如下:
1、通过DefaultSqlSessionFactory创建与数据库交互的 SqlSession “会话”,其内部会创建一个Executor执行器对象
2、然后Executor执行器通过StatementHandler创建对应的java.sql.Statement对象,并通过ParameterHandler设置参数,然后执行数据库相关操作
如果是数据库更新操作,则可能需要通过KeyGenerator先设置自增键,然后返回受影响的 行数
如果是数据库查询操作,则需要将数据库返回的ResultSet结果集对象包装ResultSetWrapper,然后通过DefaultResultSetHandler对结果集进行映射,最后返回 Java 对象
上面还涉及到一级缓存、二级缓存和延迟加载等其他处理过程,下面我们来看一下具体的执行过程
SQL执行过程(一)之Executor
在MyBatis的SQL执行过程中,Executor执行器担当着一个重要的角色,相关操作都需要通过它来执行,相当于一个调度器,把SQL语句交给它,它来调用各个组件执行操作
其中一级缓存和二级缓存都是在Executor执行器中完成的
Executor执行器接口的实现类如下图所示:
org.apache.ibatis.executor.BaseExecutor:实现Executor接口,提供骨架方法,支持一级缓存,指定几个抽象的方法交由不同的子类去实现
org.apache.ibatis.executor.SimpleExecutor:继承 BaseExecutor 抽象类,简单的 Executor 实现类(默认)
org.apache.ibatis.executor.ReuseExecutor:继承 BaseExecutor 抽象类,可重用的 Executor 实现类,相比SimpleExecutor,在Statement执行完操作后不会立即关闭,而是缓存起来,执行的SQL作为key,下次执行相同的SQL时优先从缓存中获取Statement对象
org.apache.ibatis.executor.BatchExecutor:继承 BaseExecutor 抽象类,支持批量执行的 Executor 实现类
org.apache.ibatis.executor.CachingExecutor:实现 Executor 接口,支持二级缓存的 Executor 的实现类,实际采用了装饰器模式,装饰对象为左边三个Executor类
Executor
org.apache.ibatis.executor.Executor:执行器接口,代码如下:
public interface Executor {
/**
* ResultHandler 空对象
*/
ResultHandler NO_RESULT_HANDLER = null;
/**
* 更新或者插入或者删除
* 由传入的 MappedStatement 的 SQL 所决定
*/
int update(MappedStatement ms, Object parameter) throws SQLException;
/**
* 查询,带 ResultHandler + CacheKey + BoundSql
*/
<E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler,
CacheKey cacheKey, BoundSql boundSql) throws SQLException;
/**
* 查询,带 ResultHandler
*/
<E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler)
throws SQLException;
/**
* 查询,返回 Cursor 游标
*/
<E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException;
/**
* 刷入批处理语句
*/
List<BatchResult> flushStatements() throws SQLException;
/**
* 提交事务
*/
void commit(boolean required) throws SQLException;
/**
* 回滚事务
*/
void rollback(boolean required) throws SQLException;
/**
* 创建 CacheKey 对象
*/
CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);
/**
* 判断是否缓存
*/
boolean isCached(MappedStatement ms, CacheKey key);
/**
* 清除本地缓存
*/
void clearLocalCache();
/**
* 延迟加载
*/
void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);
/**
* 获得事务
*/
Transaction getTransaction();
/**
* 关闭事务
*/
void close(boolean forceRollback);
/**
* 判断事务是否关闭
*/
boolean isClosed();
/**
* 设置包装的 Executor 对象
*/
void setExecutorWrapper(Executor executor);
}
执行器接口定义了操作数据库的相关方法:
数据库的读和写操作
事务相关
缓存相关
设置延迟加载
设置包装的 Executor 对象
BaseExecutor
org.apache.ibatis.executor.BaseExecutor:实现Executor接口,提供骨架方法,指定几个抽象的方法交由不同的子类去实现,例如:
protected abstract int doUpdate(MappedStatement ms, Object parameter) throws SQLException;
protected abstract List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException;
protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds,
ResultHandler resultHandler, BoundSql boundSql) throws SQLException;
protected abstract <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds,
BoundSql boundSql) throws SQLException;
上面这四个方法交由不同的子类去实现,分别是:更新数据库、刷入批处理语句、查询数据库和查询数据返回游标
构造方法
public abstract class BaseExecutor implements Executor {
private static final Log log = LogFactory.getLog(BaseExecutor.class);
/**
* 事务对象
*/
protected Transaction transaction;
/**
* 包装的 Executor 对象
*/
protected Executor wrapper;
/**
* DeferredLoad(延迟加载)队列
*/
protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;
/**
* 本地缓存,即一级缓存,内部就是一个 HashMap 对象
*/
protected PerpetualCache localCache;
/**
* 本地输出类型参数的缓存,和存储过程有关
*/
protected PerpetualCache localOutputParameterCache;
/**
* 全局配置
*/
protected Configuration configuration;
/**
* 记录当前会话正在查询的数量
*/
protected int queryStack;
/**
* 是否关闭
*/
private boolean closed;
protected BaseExecutor(Configuration configuration, Transaction transaction) {
this.transaction = transaction;
this.deferredLoads = new ConcurrentLinkedQueue<>();
this.localCache = new PerpetualCache("LocalCache");
this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
this.closed = false;
this.configuration = configuration;
this.wrapper = this;
}
}
其中上面的属性可根据注释进行查看
一级缓存
这里提一下localCache属性,本地缓存,用于一级缓存,MyBatis的一级缓存是什么呢?
每当我们使用 MyBatis 开启一次和数据库的会话,MyBatis 都会创建出一个 SqlSession 对象,表示与数据库的一次会话,而每个 SqlSession 都会创建一个 Executor 对象
在对数据库的一次会话中,我们有可能会反复地执行完全相同的查询语句,每一次查询都会访问一次数据库,如果在极短的时间内做了完全相同的查询,那么它们的结果极有可能完全相同,由于查询一次数据库的代价很大,如果不采取一些措施的话,可能造成很大的资源浪费
为了解决这一问题,减少资源的浪费,MyBatis 会在每一次 SqlSession 会话对象中建立一个简单的缓存,将每次查询到的结果缓存起来,当下次查询的时候,如果之前已有完全一样的查询,则会先尝试从这个简单的缓存中获取结果返回给用户,不需要再进行一次数据库查询了 注意,这个“简单的缓存”就是一级缓存,且默认开启,无法“关闭”
MyBatis 的一次会话:在一个 SqlSession 会话对象中创建一个localCache本地缓存,对于每一次查询,都会根据查询条件尝试去localCache本地缓存中获取缓存数据,如果存在,就直接从缓存中取出数据然后返回给用户,否则访问数据库进行查询,将查询结果存入缓存并返回给用户(如果设置的缓存区域为STATEMENT,默认为SESSION,在一次会话中所有查询执行后会清空当前 SqlSession 会话中的localCache本地缓存,相当于“关闭”了一级缓存)
所有的数据库更新操作都会清空当前 SqlSession 会话中的本地缓存
如上描述,MyBatis的一级缓存在多个 SqlSession 会话时,可能导致数据的不一致性,某一个 SqlSession 更新了数据而其他 SqlSession 无法获取到更新后的数据,出现数据不一致性,这种情况是不允许出现了,所以我们通常选择“关闭”一级缓存
clearLocalCache方法
clearLocalCache()方法,清空一级(本地)缓存,如果全局配置中设置的localCacheScope缓存区域为STATEMENT(默认为SESSION),则在每一次查询后会调用该方法,相当于关闭了一级缓存,代码如下:
@Override
public void clearLocalCache() {
if (!closed) {
localCache.clear();
localOutputParameterCache.clear();
}
}
createCacheKey方法
createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql)方法,根据本地查询的相关信息创建一个CacheKey缓存key对象,代码如下:
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
// <1> 创建 CacheKey 对象
CacheKey cacheKey = new CacheKey();
// <2> 设置 id、offset、limit、sql 到 CacheKey 对象中
cacheKey.update(ms.getId());
cacheKey.update(rowBounds.getOffset());
cacheKey.update(rowBounds.getLimit());
cacheKey.update(boundSql.getSql());
// <3> 设置 ParameterMapping 数组的元素对应的每个 value 到 CacheKey 对象中
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
// mimic DefaultParameterHandler logic
for (ParameterMapping parameterMapping : parameterMappings) {
if (parameterMapping.getMode() != ParameterMode.OUT) { // 该参数需要作为入参
Object value;
String propertyName = parameterMapping.getProperty();
/*
* 获取该属性值
*/
if (boundSql.hasAdditionalParameter(propertyName)) {
// 从附加参数中获取
value = boundSql.getAdditionalParameter(propertyName);
} else if (parameterObject == null) {
// 入参对象为空则直接返回 null
value = null;
} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
// 入参有对应的类型处理器则直接返回该参数
value = parameterObject;
} else {
// 从入参对象中获取该属性的值
MetaObject metaObject = configuration.newMetaObject(parameterObject);
value = metaObject.getValue(propertyName);
}
cacheKey.update(value);
}
}
// <4> 设置 Environment.id 到 CacheKey 对象中
if (configuration.getEnvironment() != null) {
// issue #176
cacheKey.update(configuration.getEnvironment().getId());
}
return cacheKey;
}
创建一个CacheKey实例对象
将入参中的id、offset、limit、sql,通过CacheKey的update方法添加到其中,它的方法如下:public void update(Object object) { // 方法参数 object 的 hashcode int baseHashCode = object == null ? 1 : ArrayUtil.hashCode(object); this.count++; // checksum 为 baseHashCode 的求和 this.checksum += baseHashCode; // 计算新的 hashcode 值 baseHashCode *= this.count; this.hashcode = this.multiplier * this.hashcode + baseHashCode; // 添加 object 到 updateList 中 this.updateList.add(object); }
获取本次查询的入参值,通过CacheKey的update方法添加到其中
获取本次环境的Environment.id,通过CacheKey的update方法添加到其中
返回CacheKey实例对象,这样就可以为本次查询生成一个唯一的缓存key对象,可以看看CacheKey重写的equal方法:
@Override
public boolean equals(Object object) {
if (this == object) {
return true;
} if (!(object instanceof CacheKey)) {
return false;
} final
CacheKey cacheKey = (CacheKey) object;
if (hashcode != cacheKey.hashcode) {
return false;
} if (checksum != cacheKey.checksum) {
return false;
} if (count != cacheKey.count) {
return false;
} for (int i = 0; i < updateList.size(); i++) {
Object thisObject = updateList.get(i);
Object thatObject = cacheKey.updateList.get(i);
if (!ArrayUtil.equals(thisObject, thatObject)) {
return false;
}
} return true;
}
query相关方法
查询数据库因为涉及到一级缓存,所以这里有多层方法,最终访问数据库的doQuery方法是交由子类去实现的,总共分为三层:
1、根据入参获取BoundSql和CacheKey对象,然后再去调用查询方法
2、涉及到一级缓存和延迟加载的处理,缓存未命中则再去调用查询数据库的方法
3、保存一些信息供一级缓存使用,内部调用doQuery方法执行数据库的读操作
接下来我们分别来看看这三个方法
① 数据库查询操作的入口
代码格式:query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler)方法,代码如下
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler)
throws SQLException {
// <1> 获得 BoundSql 对象
BoundSql boundSql = ms.getBoundSql(parameter);
// <2> 创建 CacheKey 对象
CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
// <3> 查询
return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
通过MappedStatement对象根据入参获取BoundSql对象,如果是动态SQL则需要进行解析,获取到最终的SQL,替换成?占位符
调用createCacheKey方法为本次查询创建一个CacheKey对象
继续调用query(...)方法执行查询
② 处理数据库查询操作,涉及到一级缓存
代码格式:query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)方法,代码如下:
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler,
CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
// <1> 已经关闭,则抛出 ExecutorException 异常
if (closed) {
throw new ExecutorException("Executor was closed.");
}
// <2> 清空本地缓存,如果 queryStack 为零,并且要求清空本地缓存(配置了 flushCache = true)
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
// <3> queryStack + 1
queryStack++;
// <4> 从一级缓存中,获取查询结果
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) { // <4.1> 获取到,则进行处理
// 处理缓存存储过程的结果
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else { // <4.2> 获得不到,则从数据库中查询
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
// <5> queryStack - 1
queryStack--;
}
if (queryStack == 0) { // <6> 如果当前会话的所有查询执行完了
// <6.1> 执行延迟加载
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
// <6.2> 清空 deferredLoads
deferredLoads.clear();
// <6.3> 如果缓存级别是 LocalCacheScope.STATEMENT ,则进行清理
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
clearLocalCache();
}
}
// <7> 返回查询结果
return list;
}
当前会话已经被关闭则抛出异常
如果queryStack为0(表示是当前会话只有本次查询而没有其他的查询了),并且要求清空本地缓存(配置了flushCache=true),那么直接清空一级(本地)缓存
当前会话正在查询的数量加一,queryStack++
从localCache一级缓存获取缓存的查询结果如果有缓存数据,则需要处理储存过程的情况,将需要作为出参(OUT)的参数设置到本次查询的入参的属性中如果没有缓存数据,则调用queryFromDatabase方法,执行数据库查询操作
当前会话正在查询的数量减一,queryStack--
如果当前会话所有查询都执行完执行当前会话中的所有的延迟加载deferredLoads,这种延迟加载属于查询后的延迟,和后续讲到的获取属性时再加载不同,这里的延迟加载是在哪里生成的呢?在DefaultResultSetHandler中进行结果映射时,如果某个属性配置的是子查询,并且本次的子查询在一级缓存中有缓存数据,那么将会创建一个DeferredLoad对象保存在deferredLoads中,该属性值先设置为DEFERRED延迟加载对象(final修饰的Object对象),待当前会话所有的查询结束后,也就是当前执行步骤,则会从一级缓存获取到数据设置到返回结果中清空所有的延迟加载deferredLoads对象如果全局配置的缓存级别为STATEMENT(默认为SESSION),则清空当前会话中一级缓存的所有数据
返回查询结果
③ 执行数据库查询操作
代码格式:queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)方法,代码如下:
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds,
ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
// <1> 在缓存中,添加正在执行的占位符对象,因为正在执行的查询不允许提前加载需要延迟加载的属性,可见 DeferredLoad#canLoad() 方法
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
// <2> 执行读操作
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
// <3> 从缓存中,移除占位对象
localCache.removeObject(key);
}
// <4> 添加到缓存中
localCache.putObject(key, list);
// <5> 如果是存储过程,则将入参信息保存保存,跟一级缓存处理存储过程相关
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
// <6> 返回查询结果
return list;
}
在缓存中,添加正在执行的EXECUTION_PLACEHOLDER占位符对象,因为正在执行的查询不允许提前加载需要延迟加载的属性,可见 DeferredLoad#canLoad() 方法
调用查询数据库doQuery方法,该方法交由子类实现
删除第1步添加的占位符
将查询结果添加到localCache一级缓存中
如果是存储过程,则将入参信息保存保存,跟一级缓存处理存储过程相关,可见上面的第②个方法的第4.1步
返回查询结果
update方法
update(MappedStatement ms, Object parameter)方法,执行更新数据库的操作,代码如下:
@Override
public int update(MappedStatement ms, Object parameter) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
// <1> 已经关闭,则抛出 ExecutorException 异常
if (closed) {
throw new ExecutorException("Executor was closed.");
}
// <2> 清空本地缓存
clearLocalCache();
// <3> 执行写操作
return doUpdate(ms, parameter);
}
当前会话已经被关闭则抛出异常
清空当前会话中一级缓存的所有数据
调用更新数据库doUpdate方法,该方法交由子类实现
其他方法
除了上面介绍的几个重要的方法以外,还有其他很多方法,例如获取当前事务,提交事务,回滚事务,关闭会话等等,这里我就不一一列出来了,大家可以自行阅读类源码,我下面会进行涉及,但是不会那么精细
二级缓存
问题
在讲到的一级缓存中,缓存数据仅在当前的 SqlSession 会话中进行共享,可能会导致多个 SqlSession 出现数据不一致性的问题
如果需要在多个 SqlSession 之间需要共享缓存数据,则需要使用到二级缓存
开启二级缓存后,会使用CachingExecutor对象装饰其他的Executor类,这样会先在CachingExecutor进行二级缓存的查询,缓存未命中则进入装饰的对象中,进行一级缓存的查询
流程如下图所示:
在全局配置对象中cacheEnabled是否开启缓存属性默认为true,可以在mybatis-config.xml配置文件中添加以下配置关闭:
<configuration>
<settings>
<setting name="cacheEnabled" value="false" />
</settings>
</configuration>
我们来看看MyBatis是如何实现二级缓存的
CachingExecutor
org.apache.ibatis.executor.CachingExecutor:实现 Executor 接口,支持二级缓存的 Executor 的实现类
构造方法
public class CachingExecutor implements Executor {
/**
* 被委托的 Executor 对象
*/
private final Executor delegate;
/**
* TransactionalCacheManager 对象
*/
private final TransactionalCacheManager tcm = new TransactionalCacheManager();
public CachingExecutor(Executor delegate) {
this.delegate = delegate;
// 设置 delegate 被当前执行器所包装
delegate.setExecutorWrapper(this);
}
}
query方法
处理数据库查询操作的方法,涉及到二级缓存,会将Cache二级缓存对象装饰成TransactionalCache对象并存放在TransactionalCacheManager管理器中,代码如下:
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds,
ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
// <1> 获取 Cache 二级缓存对象
Cache cache = ms.getCache();
// <2> 如果配置了二级缓存
if (cache != null) {
// <2.1> 如果需要清空缓存,则进行清空
flushCacheIfRequired(ms);
// <2.2> 如果当前操作需要使用缓存(默认开启)
if (ms.isUseCache() && resultHandler == null) {
// <2.2.1> 如果是存储过程相关操作,保证所有的参数模式为 ParameterMode.IN
ensureNoOutParams(ms, boundSql);
// <2.2.2> 从二级缓存中获取结果,会装饰成 TransactionalCache
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
// <2.2.3> 如果不存在,则从数据库中查询
list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
// <2.2.4> 将缓存结果保存至 TransactionalCache
tcm.putObject(cache, key, list); // issue #578 and #116
}
// <2.2.5> 直接返回结果
return list;
}
}
// <3> 没有使用二级缓存,则调用委托对象的方法
return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
获取Cache二级缓存对象
如果该对象不为空,表示配置了二级缓存如果需要清空缓存,则进行清空如果当前操作需要使用缓存(默认开启)如果是存储过程相关操作,保证所有的参数模式为ParameterMode.IN通过TransactionalCacheManager从二级缓存中获取结果,会装饰成TransactionalCach对象如果缓存未命中,则调用委托对象的query方法将缓存结果保存至TransactionalCache对象中,并未真正的保存至Cache二级缓存中,需要待事务提交才会保存过去,其中缓存未命中的也会设置缓存结果为null直接返回结果
没有使用二级缓存,则调用委托对象的方法
update方法
@Override
public int update(MappedStatement ms, Object parameterObject) throws SQLException {
// 如果需要清空缓存,则进行清空
flushCacheIfRequired(ms);
// 执行 delegate 对应的方法
return delegate.update(ms, parameterObject);
}
private void flushCacheIfRequired(MappedStatement ms) {
Cache cache = ms.getCache();
if (cache != null && ms.isFlushCacheRequired()) {
tcm.clear(cache);
}
}
数据库的更新操作,如果配置了需要清空缓存,则清空二级缓存
这里就和一级缓存不同,一级缓存是所有的更新操作都会清空一级缓存
commit方法
@Override
public void commit(boolean required) throws SQLException {
// 执行 delegate 对应的方法
delegate.commit(required);
// 提交 TransactionalCacheManager
tcm.commit();
}
在事务提交后,通过TransactionalCacheManager二级缓存管理器,将本次事务生成的缓存数据从TransactionalCach中设置到正真的Cache二级缓存中
rollback方法
@Override
public void rollback(boolean required) throws SQLException {
try {
// 执行 delegate 对应的方法
delegate.rollback(required);
} finally {
if (required) {
// 回滚 TransactionalCacheManager
tcm.rollback();
}
}
}
在事务回滚后,如果需要的话,通过TransactionalCacheManager二级缓存管理器,将本次事务生成的缓存数据从TransactionalCach中移除
close方法
@Override
public void close(boolean forceRollback) {
try {
// issues #499, #524 and #573
if (forceRollback) {
tcm.rollback();
} else {
tcm.commit();
}
} finally {
delegate.close(forceRollback);
}
}
在事务关闭前,如果是强制回滚操作,则TransactionalCacheManager二级缓存管理器,将本次事务生成的缓存数据从TransactionalCach中移除,否则还是将缓存数据设置到正真的Cache二级缓存中
TransactionalCacheManager
org.apache.ibatis.cache.TransactionalCacheManager:二级缓存管理器,因为二级缓存是支持跨 SqlSession 共享的,所以需要通过它来实现,当事务提交时,才将当前事务中查询时产生的缓存,同步到二级缓存中,代码如下:
public class TransactionalCacheManager {
/**
* Cache 和 TransactionalCache 的映射
*/
private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>();
public void clear(Cache cache) {
getTransactionalCache(cache).clear();
}
public Object getObject(Cache cache, CacheKey key) {
return getTransactionalCache(cache).getObject(key);
}
public void putObject(Cache cache, CacheKey key, Object value) {
// 首先,获得 Cache 对应的 TransactionalCache 对象
// 然后,添加 KV 到 TransactionalCache 对象中
getTransactionalCache(cache).putObject(key, value);
}
public void commit() {
for (TransactionalCache txCache : transactionalCaches.values()) {
txCache.commit();
}
}
public void rollback() {
for (TransactionalCache txCache : transactionalCaches.values()) {
txCache.rollback();
}
}
private TransactionalCache getTransactionalCache(Cache cache) {
return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new);
}
}
getTransactionalCache(Cache cache)方法,根据Cache二级缓存对象获取对应的TransactionalCache对象,如果没有则创建一个保存起来
getObject(Cache cache, CacheKey key)方法,会先调用getTransactionalCache(Cache cache)方法获取对应的TransactionalCache对象,然后根据CacheKey从该对象中获取缓存结果
putObject(Cache cache, CacheKey key, Object value)方法,同样也先调用getTransactionalCache(Cache cache)方法获取对应的TransactionalCache对象,根据该对象将结果进行缓存
commit()方法,遍历transactionalCaches,依次调用TransactionalCache的提交方法
rollback()方法,遍历transactionalCaches,依次调用TransactionalCache的回滚方法
TransactionalCache
org.apache.ibatis.cache.decorators.TransactionalCache:用来装饰二级缓存的对象,作为二级缓存一个事务的缓冲区
在一个SqlSession会话中,该类包含所有需要添加至二级缓存的的缓存数据,当提交事务后会全部刷出到二级缓存中,或者事务回滚后移除这些缓存数据,代码如下:
public class TransactionalCache implements Cache {
private static final Log log = LogFactory.getLog(TransactionalCache.class);
/**
* 委托的 Cache 对象。
*
* 实际上,就是二级缓存 Cache 对象。
*/
private final Cache delegate;
/**
* 提交时,清空 {@link #delegate}
*
* 初始时,该值为 false
* 清理后{@link #clear()} 时,该值为 true ,表示持续处于清空状态
*
* 因为可能事务还未提交,所以不能直接清空所有的缓存,而是设置一个标记,获取缓存的时候返回 null 即可
* 先清空下面这个待提交变量,待事务提交的时候才真正的清空缓存
*
*/
private boolean clearOnCommit;
/**
* 待提交的 Key-Value 映射
*/
private final Map<Object, Object> entriesToAddOnCommit;
/**
* 查找不到的 KEY 集合
*/
private final Set<Object> entriesMissedInCache;
public TransactionalCache(Cache delegate) {
this.delegate = delegate;
this.clearOnCommit = false;
this.entriesToAddOnCommit = new HashMap<>();
this.entriesMissedInCache = new HashSet<>();
}
@Override
public Object getObject(Object key) {
// issue #116
// <1> 从 delegate 中获取 key 对应的 value
Object object = delegate.getObject(key);
if (object == null) {// <2> 如果不存在,则添加到 entriesMissedInCache 中
entriesMissedInCache.add(key);
}
// issue #146
if (clearOnCommit) {// <3> 如果 clearOnCommit 为 true ,表示处于持续清空状态,则返回 null
return null;
} else {
return object;
}
}
@Override
public void putObject(Object key, Object object) {
// 暂存 KV 到 entriesToAddOnCommit 中
entriesToAddOnCommit.put(key, object);
}
@Override
public void clear() {
// <1> 标记 clearOnCommit 为 true
clearOnCommit = true;
// <2> 清空 entriesToAddOnCommit
entriesToAddOnCommit.clear();
}
public void commit() {
// <1> 如果 clearOnCommit 为 true ,则清空 delegate 缓存
if (clearOnCommit) {
delegate.clear();
}
// 将 entriesToAddOnCommit、entriesMissedInCache 刷入 delegate 中
flushPendingEntries();
// 重置
reset();
}
public void rollback() {
// <1> 从 delegate 移除出 entriesMissedInCache
unlockMissedEntries();
// <2> 重置
reset();
}
private void reset() {
clearOnCommit = false;
entriesToAddOnCommit.clear();
entriesMissedInCache.clear();
}
private void flushPendingEntries() {
for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {
delegate.putObject(entry.getKey(), entry.getValue());
}
for (Object entry : entriesMissedInCache) {
if (!entriesToAddOnCommit.containsKey(entry)) {
delegate.putObject(entry, null);
}
}
}
private void unlockMissedEntries() {
for (Object entry : entriesMissedInCache) {
try {
delegate.removeObject(entry);
} catch (Exception e) {
log.warn("Unexpected exception while notifiying a rollback to the cache adapter."
+ "Consider upgrading your cache adapter to the latest version. Cause: " + e);
}
}
}
}
根据上面的注释查看每个属性的作用,我们依次来看下面的方法,看看在不同事务之前是如何处理二级缓存的
putObject(Object key, Object object)方法,添加缓存数据时,先把缓存数据保存在entriesToAddOnCommit中,这个对象属于当前事务,事务还未提交,其他事务是不能访问到的
clear()方法,设置clearOnCommit标记为true,告诉当前事务正处于持续清空状态,先把entriesToAddOnCommit清空,也就是当前事务中还未提交至二级缓存的缓存数据,事务还未提交,不能直接清空二级缓存中的数据,否则影响到其他事务了
commit()方法,事务提交后,如果clearOnCommit为true,表示正处于持续清空状态,需要先把二级缓存中的数据全部清空,然后再把当前事务生成的缓存设置到二级缓存中,然后重置当前对象这里为什么处于清空状态把二级缓存的数据清空后,还要将当前事务生成的缓存数据再设置到二级缓存中呢?因为当前事务调用clear()方法后可能有新生成了新的缓存数据,而不能把这些忽略掉
getObject(Object key)方法先从delegate二级缓存对象中获取结果如果缓存未命中则将该key添加到entriesMissedInCache属性中,因为二级缓存也会将缓存未命中的key起来,数据为null如果clearOnCommit为true,即使你缓存命中了也返回null,因为触发clear()方法的话,本来需要清空二级缓存的,但是事务还未提交,所以先标记一个缓存持续清理的这么一个状态,这样相当于在当前事务中既清空了二级缓存数据,也不影响其他事务的二级缓存数据返回获取到的结果,可能为null
Executor在哪被创建
前面对Executor执行器接口以及实现类都有分析过,那么它是在哪创建的呢?
public class DefaultSqlSessionFactory implements SqlSessionFactory {
private final Configuration configuration;
@Override
public SqlSession openSession() {
return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, false);
}
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level,
boolean autoCommit) {
Transaction tx = null;
try {
// 获得 Environment 对象
final Environment environment = configuration.getEnvironment();
// 创建 Transaction 对象
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
// 创建 Executor 对象
final Executor executor = configuration.newExecutor(tx, execType);
// 创建 DefaultSqlSession 对象
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
// 如果发生异常,则关闭 Transaction 对象
closeTransaction(tx); // may have fetched a connection so lets call close()
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
}
我们所有的数据库操作都是在MyBatis的一个SqlSession会话中执行的,在它被创建的时候,会先通过Configuration全局配置对象的newExecutor方法创建一个Executor执行器
newExecutor(Transaction transaction, ExecutorType executorType)方法,根据执行器类型创建执行Executor执行器,代码如下:
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
// <1> 获得执行器类型
executorType = executorType == null ? defaultExecutorType : executorType;
executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
// <2> 创建对应实现的 Executor 对象
Executor executor;
if (ExecutorType.BATCH == executorType) {
executor = new BatchExecutor(this, transaction);
} else if (ExecutorType.REUSE == executorType) {
executor = new ReuseExecutor(this, transaction);
} else {
executor = new SimpleExecutor(this, transaction);
}
// <3> 如果开启缓存,创建 CachingExecutor 对象,进行包装
if (cacheEnabled) {
executor = new CachingExecutor(executor);
}
// <4> 应用插件
executor = (Executor) interceptorChain.pluginAll(executor);
return executor;
}
获得执行器类型,默认为SIMPLE
创建对应的Executor对象,默认就是SimpleExecutor执行器了
如果全局配置了开启二级缓存,则将Executor对象,封装成CachingExecutor对象
插件链应用该对象,在后续会讲到
啊,不知不觉2W字了,写源码的东西是真的浪费时间的好东西啊,比刷抖音都消耗时间。但是也是真的香,看着自己整理的这些笔记,那是相当有成就感
最后,希望各位在学习的过程中,能够重视一下源码的阅读,因为源码中很多的方法其实都已经注释好了,很方便阅读
评论