大家好,我是洋仔
,JanusGraph图解系列文章,实时更新
~
图数据库文章总目录:
**源码分析相关可查看github(码文不易,求个star~)
**: https://github.com/YYDreamer/janusgraph
下述流程高清大图地址:https://www.processon.com/view/link/5f471b2e7d9c086b9903b629
版本:JanusGraph-0.5.2
转载文章请保留以下声明:
作者:洋仔聊编程
微信公众号:匠心Java
原文地址:https://www.infoq.cn/u/yybiancheng/publish
在分布式系统中,难免涉及到对同一数据的并发操作,如何保证分布式系统中数据的并发安全呢?分布式锁!
一:分布式锁
常用的分布式锁实现方式:
1、基于数据库实现分布式锁
针对于数据库实现的分布式锁,如mysql使用使用for update
共同竞争一个行锁来实现; 在JanusGraph中,也是基于数据库实现的分布式锁,这里的数据库
指的是我们当前使用的第三方backend storage
,具体的实现方式也和mysql有所不同,具体我们会在下文分析
2、基于Redis实现的分布式锁
基于lua脚本
+setNx
实现
3、基于zk实现的分布式锁
基于znode
的有序性和临时节点
+zk的watcher
机制实现
4、MVCC多版本并发控制乐观锁实现
本文主要介绍Janusgraph的锁机制,其他的实现机制就不在此做详解了
下面我们来分析一下JanusGraph
的锁机制
实现~
二:JanusGraph锁机制
在JanusGraph中使用的锁机制是:本地锁
+ 分布式锁
来实现的;
2.1 一致性行为
在JanusGraph
中主要有三种一致性修饰词(Consistency Modifier)
来表示3种不同的一致性行为
,来控制图库使用过程中的并发问题的控制程度;
public enum ConsistencyModifier {
DEFAULT,
LOCK,
FORK
}
源码中ConsistencyModifier
枚举类主要作用:用于控制JanusGraph在最终一致或其他非事务性后端系统
上的一致性行为!其作用分别为:
DEFAULT:默认的一致性行为,不使用分布式锁进行控制,对配置的存储后端使用由封闭事务保证的默认一致性模型,一致性行为主要取决于存储后端的配置以及封闭事务的(可选)配置;无需显示配置即可使用
LOCK:在存储后端支持锁的前提下,显示的获取分布式锁以保证一致性!确切的一致性保证取决于所配置的锁实现;需management.setConsistency(element, ConsistencyModifier.LOCK);
语句进行配置
FORK:只适用于multi-edges
和list-properties
两种情况下使用;使JanusGraph修改数据时,采用先删除后添加新的边/属性的方式,而不是覆盖现有的边/属性,从而避免潜在的并发写入冲突;需management.setConsistency(element, ConsistencyModifier.FORK);
进行配置
LOCK
在查询或者插入数据时,是否使用分布式锁
进行并发控制,在图shcema
的创建过程中,如上述可以通过配置schema元素
为ConsistencyModifier.LOCK
方式控制并发,则在使用过程中就会用分布式锁
进行并发控制;
为了提高效率,JanusGraph默认不使用锁定。 因此,用户必须为定义一致性约束
的每个架构元素决定是否使用锁定。
使用JanusGraphManagement.setConsistency(element,ConsistencyModifier.LOCK)
显式启用对架构元素的锁定
代码如下所示:
mgmt = graph.openManagement()
name = mgmt.makePropertyKey('consistentName').dataType(String.class).make()
index = mgmt.buildIndex('byConsistentName', Vertex.class).addKey(name).unique().buildCompositeIndex()
mgmt.setConsistency(name, ConsistencyModifier.LOCK) // Ensures only one name per vertex
mgmt.setConsistency(index, ConsistencyModifier.LOCK) // Ensures name uniqueness in the graph
mgmt.commit()
FORK
由于边缘作为单个记录存储在基础存储后端中,因此同时修改单个边缘将导致冲突。
FORK
就是为了代替LOCK
,可以将边缘标签配置为使用ConsistencyModifier.FORK
。
下面的示例创建一个新的edge label,并将其设置为ConsistencyModifier.FORK
mgmt = graph.openManagement()
related = mgmt.makeEdgeLabel('related').make()
mgmt.setConsistency(related, ConsistencyModifier.FORK)
mgmt.commit()
经过上述配置后,修改标签配置为FORK
的edge时,操作步骤为:
首先,删除该边
将修改后的边作为新边添加
因此,如果两个并发事务修改了同一边缘,则提交时将存在边缘的两个修改后的副本,可以在查询遍历期间根据需要解决这些副本。
注意edge fork仅适用于MULTI edge。 具有多重性约束的边缘标签不能使用此策略,因为非MULTI的边缘标签定义中内置了一个唯一性约束,该约束需要显式锁定或使用基础存储后端的冲突解决机制
下面我们具体来看一下janusgrph
的锁机制
的实现:
2.2 LoackID
在介绍锁机制之前,先看一下锁应该锁什么东西呢?
我们都知道在janusgraph
的底层存储中,vertexId作为Rowkey,属性和边存储在cell中,由column+value组成
当我们修改节点的属性和边
+边的属性时
,很明显只要锁住对应的Rowkey + Column
即可;
在Janusgraph
中,这个锁的标识的基础部分就是LockID
:
LockID = RowKey + Column
源码如下:
KeyColumn lockID = new KeyColumn(key, column);
2.3 本地锁
本地锁
是在任何情况下都需要获取的一个锁,只有获取成功后,才会进行下述分布式锁
的获取!
本地锁
是基于图实例
维度存在的;主要作用是保证当前图实例下的操作中无冲突!
本地锁的实现是通过ConcurrentHashMap
数据结构来实现的,在图实例维度下唯一;
基于当前事务
+lockId
来作为锁标识
;
获取的主要流程:
结合源码如下:
上述图建议依照源码一块分析,源码在LocalLockMediator
类中的下述方法,下面源码分析模块
会详细分析
public boolean lock(KeyColumn kc, T requester, Instant expires) {
}
引入本地锁机制,主要目的: 在图实例维度来做一层锁判断,减少分布式锁的并发冲突
,减少分布式锁带来的性能消耗
2.4 分布式锁
在本地锁
获取成功之后才会去尝试获取分布式锁
;
分布式锁的获取整体分为两部分流程:
分布式锁信息插入
分布式锁信息状态判断
分布式锁信息插入
该部分主要是通过lockID
来构造要插入的Rowkey和column
并将数据插入到hbase
中;插入成功即表示这部分处理成功!
具体流程如下:
分布式锁信息状态判断
该部分在上一部分完成之后才会进行,主要是判断分布式锁是否获取成功!
查询出当前hbase中对应Rowkey的所有column
,过滤未过期的column集合,比对集合的第一个column是否等于当前事务插入的column;
等于则获取成功!不等于则获取失败!
具体流程如下:
三:源码分析 与 整体流程
源码分析已经push到github:https://github.com/YYDreamer/janusgraph
1、获取锁的入口
public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException {
if (locker != null) {
ExpectedValueCheckingTransaction tx = (ExpectedValueCheckingTransaction) txh;
if (tx.isMutationStarted())
throw new PermanentLockingException("Attempted to obtain a lock after mutations had been persisted");
KeyColumn lockID = new KeyColumn(key, column);
log.debug("Attempting to acquireLock on {} ev={}", lockID, expectedValue);
locker.writeLock(lockID, tx.getConsistentTx());
tx.storeExpectedValue(this, lockID, expectedValue);
} else {
store.acquireLock(key, column, expectedValue, unwrapTx(txh));
}
}
2、执行 locker.writeLock(lockID, tx.getConsistentTx()) 触发锁获取
public void writeLock(KeyColumn lockID, StoreTransaction tx) throws TemporaryLockingException, PermanentLockingException {
if (null != tx.getConfiguration().getGroupName()) {
MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_CALLS).inc();
}
if (lockState.has(tx, lockID)) {
log.debug("Transaction {} already wrote lock on {}", tx, lockID);
return;
}
if (lockLocally(lockID, tx)) {
boolean ok = false;
try {
S stat = writeSingleLock(lockID, tx);
lockLocally(lockID, stat.getExpirationTimestamp(), tx);
lockState.take(tx, lockID, stat);
ok = true;
} catch (TemporaryBackendException tse) {
throw new TemporaryLockingException(tse);
} catch (AssertionError ae) {
ok = true;
throw ae;
} catch (Throwable t) {
throw new PermanentLockingException(t);
} finally {
if (!ok) {
unlockLocally(lockID, tx);
if (null != tx.getConfiguration().getGroupName()) {
MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_EXCEPTIONS).inc();
}
}
}
} else {
throw new PermanentLockingException("Local lock contention");
}
}
包含两个部分:
本地锁的获取lockLocally(lockID, tx)
分布式锁的获取writeSingleLock(lockID, tx)
注意此处只是将锁信息写入到Hbase中,并不代表获取分布式锁成功,只是做了上述介绍的第一个阶段分布式锁信息插入
**3、本地锁获取 lockLocally(lockID, tx)
**
public boolean lock(KeyColumn kc, T requester, Instant expires) {
assert null != kc;
assert null != requester;
final StackTraceElement[] acquiredAt = log.isTraceEnabled() ?
new Throwable("Lock acquisition by " + requester).getStackTrace() : null;
final AuditRecord<T> audit = new AuditRecord<>(requester, expires, acquiredAt);
final AuditRecord<T> inMap = locks.putIfAbsent(kc, audit);
boolean success = false;
if (null == inMap) {
if (log.isTraceEnabled()) {
log.trace("New local lock created: {} namespace={} txn={}",
kc, name, requester);
}
success = true;
} else if (inMap.equals(audit)) {
success = locks.replace(kc, inMap, audit);
if (log.isTraceEnabled()) {
if (success) {
log.trace("Updated local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
kc, name, requester, inMap.expires, audit.expires);
} else {
log.trace("Failed to update local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
kc, name, requester, inMap.expires, audit.expires);
}
}
} else if (0 > inMap.expires.compareTo(times.getTime())) {
success = locks.replace(kc, inMap, audit);
if (log.isTraceEnabled()) {
log.trace("Discarding expired lock: {} namespace={} txn={} expired={}",
kc, name, inMap.holder, inMap.expires);
}
} else {
if (log.isTraceEnabled()) {
log.trace("Local lock failed: {} namespace={} txn={} (already owned by {})",
kc, name, requester, inMap);
log.trace("Owner stacktrace:\n {}", Joiner.on("\n ").join(inMap.acquiredAt));
}
}
return success;
}
如上述介绍,本地锁的实现是通过ConcurrentHashMap
数据结构来实现的,在图实例维度下唯一!
4、分布式锁获取第一个阶段:分布式锁信息插入
protected ConsistentKeyLockStatus writeSingleLock(KeyColumn lockID, StoreTransaction txh) throws Throwable {
final StaticBuffer lockKey = serializer.toLockKey(lockID.getKey(), lockID.getColumn());
StaticBuffer oldLockCol = null;
for (int i = 0; i < lockRetryCount; i++) {
WriteResult wr = tryWriteLockOnce(lockKey, oldLockCol, txh);
if (wr.isSuccessful() && wr.getDuration().compareTo(lockWait) <= 0) {
final Instant writeInstant = wr.getWriteTimestamp();
final Instant expireInstant = writeInstant.plus(lockExpire);
return new ConsistentKeyLockStatus(writeInstant, expireInstant);
}
oldLockCol = wr.getLockCol();
handleMutationFailure(lockID, lockKey, wr, txh);
}
tryDeleteLockOnce(lockKey, oldLockCol, txh);
throw new TemporaryBackendException("Lock write retry count exceeded");
}
上述只是将锁信息插入,插入成功标识该流程结束
5、分布式锁获取第一个阶段:分布式锁锁定是否成功判定
这一步,是在commit
阶段进行的验证
public void commit() throws BackendException {
flushInternal();
tx.commit();
}
最终会调用checkSingleLock
方法,判断获取锁的状态!
protected void checkSingleLock(final KeyColumn kc, final ConsistentKeyLockStatus ls,
final StoreTransaction tx) throws BackendException, InterruptedException {
if (ls.isChecked())
return;
KeySliceQuery ksq = new KeySliceQuery(serializer.toLockKey(kc.getKey(), kc.getColumn()), LOCK_COL_START,
LOCK_COL_END);
List<Entry> claimEntries = getSliceWithRetries(ksq, tx);
final Iterable<TimestampRid> iterable = Iterables.transform(claimEntries,
e -> serializer.fromLockColumn(e.getColumnAs(StaticBuffer.STATIC_FACTORY), times));
final List<TimestampRid> unexpiredTRs = new ArrayList<>(Iterables.size(iterable));
for (TimestampRid tr : iterable) {
final Instant cutoffTime = now.minus(lockExpire);
if (tr.getTimestamp().isBefore(cutoffTime)) {
...
}
unexpiredTRs.add(tr);
}
checkSeniority(kc, ls, unexpiredTRs);
ls.setChecked();
}
四:整体流程
总流程如下图:
整体流程为:
获取本地锁
获取分布式锁
1. 插入分布式锁信息
2. commit阶段判断分布式锁获取是否成功
获取失败,则重试
五:总结
JanusGraph的锁机制主要是通过本地锁+分布式锁
来实现分布式系统下的数据一致性;
分布式锁的控制维度为:property、vertex、edge、index都可以;
JanusGraph
支持在数据导入时通过前面一致性行为
部分所说的LOCK
来开关分布式锁:
LOCK:数据导入时开启分布式锁保证分布式一致性
DEFAULT、FORK:数据导入时关闭分布式锁
是否开启分布式锁思考:
在开启分布式锁的情况下,数据导入开销非常大;如果是数据不是要求很高的一致性,并且数据量比较大,我们可以选择关闭分布式锁相关,来提高导入速度;
然后,针对于小数据量的要求高一致性的数据,单独开启分布式锁来保证数据安全;
另外,我们在不开启分布式锁定的情况下,可以通过针对于导入的数据的充分探查
来减少冲突!
针对于图schema的元素开启还是关闭分布式锁,还是根据实际业务情况来决定。
本文有任何问题,可加博主微信或评论指出,感谢!
码文不易,给个赞和star吧~
评论