剑指 pulsar 之数据写入流程
- 2023-11-03 广东
本文字数:9775 字
阅读完需:约 32 分钟
引言
问:一个组件最重要的功能是什么
答:读写。因为一个组件再牛掰也是服务于人的,也就是跟人做交互(直接或间接),本质上就是读写操作
读写是软件最基本也是最重要的功能没有之一,仔细想想所有软件提供服务的方式本质上不就是读写吗?相比读来说,写其实更核心,因为写决定了读的方式,同时在数据写时存的结构就已经限制了这个组件的应用场景。举个简单的例子,相比行式存储,列式存储更做分析,因为分析一般都是基于某几个字段进行的,因此仅加载几列的性能远大于加载所有列的性能,其次按列存储可以很轻松的做到在存的时候按照小批次做聚合操作,在真正需要聚合是仅取每个批次的元数据即可快速得到结果。因此在学习 pulsar 的过程中我们就要更加关注它的数据写入流程,通过这个视角切入也有利于我们了解它内部的各种机制,好处有三 ① 遇到问题才能快速定位根因 ② 知道怎么用好 pulsar ③ 学习它在这个场景下遇到的问题是怎么去解决的从而扩充咱们得知识面,在未来的某一天咱们也能设计出一个“完美系统”
在顺便说下限制,本篇文章内容仅集中在服务端,从设计在到实现带小伙伴们看看服务端接收到写数据请求后开始的一系列动作
设计
应用服务调用图
pulsar 的数据写入是由 pulsar 和 bookeeper 共同完成的,它们服务实例 Broker 和 Bookie 本质上都是 JVM 进程,在启动的时候通过启动 Netty 实例进行 tcp 网络端口的监听,从而进行请求的处理。如图所示,在接收到网络写数据请求时,Netty 服务会感知到并将数据写入的请求到当任务队列 Task queue 中排队处理,处理时会通过调用 Broker 封装好的接口进行数据写入,而具体的写入实际上是通过网络将请求发送到 Bookie 服务,Bookie 服务的处理流程也类似,最终真正写入磁盘的地方是在第七步骤,也就是通过 journal 来将数据写到磁盘中完成持久化
服务内部调用流程
通过上图,咱们可以清晰的看到消息是怎么在服务之间流转的
producer 往 Broker 发送写消息请求
为了提升追尾读的性能,Broker 在消息写入 Bookkeeper 成功后,会将数据缓存在本地内存中加速查询性能
数据写到 Netty 请求队列中,这些消息会被轮询分发到不同的 Ledger 中
消息的索引会存在 rocksDB 中,数据落在磁盘中
消息写到 MemTable 中,这是 Bookkeeper 的缓存
消息会写到操作系统的 PageCache 中并最终落地到磁盘中
从上面的操作步骤会看到,数据会同时写到 Ledger 和 Journal 中,Journal 是为了落地存储,Ledger 是为了提升追赶读性能,通过 rocksDB 快速定位到目标数据所在磁盘的位置并且快速进行读取。上述大致就是 pulsar 数据写入的设计,接下来让咱们一起深入品尝下它的实现代码
实现
Broker 端和 Bookkeeper 端的都是基于 Netty 来启动的 JVM 实例,对外提供服务也是基于 Netty 的读写能力
PulsarDecoder#channelRead //实现Netty读取请求的接口,同时解析请求类型并选择对应的处理方式
ServerCnx#handleSend //真正处理写消息逻辑的方法
Producer#publishMessage
Producer#publishMessageToTopic
PersistentTopic#publishMessage
PersistentTopic#asyncAddEntry
ManagedLedgerImpl#asyncAddEntry
ManagedLedgerImpl#internalAsyncAddEntry
OpAddEntry#initiate
LedgerHandle#asyncAddEntry
LedgerHandle#doAsyncAddEntry
Queue<PendingAddOp>#add //写消息操作最终被放到Queue队列中
PendingAddOp#writeComplete
PendingAddOp#sendAddSuccessCallbacks
LedgerHandle#sendAddSuccessCallbacks
PendingAddOp#sendWriteRequest
Bookie#addEntry
Bookie#addEntryInternal
Journal#logAddEntry
BlockingQueue<QueueEntry>#add
Journal#run
BufferedChannel#write(bc.write)
看了调用栈后,咱们深入看看具体的实现,首先从入口 PulsarDecoder#channelRead 开始
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Get a buffer that contains the full frame
ByteBuf buffer = (ByteBuf) msg;
try {
int cmdSize = (int) buffer.readUnsignedInt();
cmd.parseFrom(buffer, cmdSize);
messageReceived();
switch (cmd.getType()) {
case PARTITIONED_METADATA:
//.....
case PARTITIONED_METADATA_RESPONSE:
//.....
case LOOKUP:
//.....
case LOOKUP_RESPONSE:
//.....
case ACK:
//.....
case ACK_RESPONSE:
//.....
case CLOSE_CONSUMER:
//.....
case CLOSE_PRODUCER:
//.....
case CONNECT:
//.....
case CONNECTED:
//.....
case ERROR:
//.....
case FLOW:
//.....
case MESSAGE:
//.....
case PRODUCER:
//.....
//写消息的核心方法
case SEND: {
checkArgument(cmd.hasSend());
try {
interceptCommand(cmd);
// Store a buffer marking the content + headers
ByteBuf headersAndPayload = buffer.markReaderIndex();
handleSend(cmd.getSend(), headersAndPayload);
} catch (InterceptException e) {
//.....
}
break;
}
case SEND_ERROR:
//.....
case SEND_RECEIPT:
//.....
case SUBSCRIBE:
//.....
case SUCCESS:
//.....
case PRODUCER_SUCCESS:
//.....
case UNSUBSCRIBE:
//.....
case SEEK:
//.....
case PING:
//.....
case PONG:
//.....
case REDELIVER_UNACKNOWLEDGED_MESSAGES:
//.....
case CONSUMER_STATS:
//.....
case CONSUMER_STATS_RESPONSE:
//.....
case REACHED_END_OF_TOPIC:
//.....
case GET_LAST_MESSAGE_ID:
//.....
case GET_LAST_MESSAGE_ID_RESPONSE:
//.....
case ACTIVE_CONSUMER_CHANGE:
//.....
case GET_TOPICS_OF_NAMESPACE:
//.....
case GET_TOPICS_OF_NAMESPACE_RESPONSE:
//.....
case GET_SCHEMA:
//.....
//省略大量case
default:
break;
}
} finally {
buffer.release();
}
}
在有请求 pulsar 时,netty 会感知到请求并触发 channelRead 方法,pulsar 在 channelRead 方法中采用了门面方法设计模式枚举出所有定义好的处理方式,通过解析请求的类型,来调用对应的 case 进行处理,提升代码可读性、可维护性。
而写数据请求会触发 SEND 这个 case,从而调用 ServerCnx#handleSend 方法进行处理,咱们进一步看看处理逻辑
protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
//在生产者启动时,就会连接pulsar集群,服务端会创建对应的代理对象并分配唯一的producerId
CompletableFuture<Producer> producerFuture = producers.get(send.getProducerId());
if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
log.warn("[{}] Received message, but the producer is not ready : {}. Closing the connection.",
remoteAddress, send.getProducerId());
close();
return;
}
Producer producer = producerFuture.getNow(null);
if (log.isDebugEnabled()) {
printSendCommandDebug(send, headersAndPayload);
}
//如果是非持久化Topic并且没超过限流额度,则直接写到内存中
if (producer.isNonPersistentTopic()) {
// avoid processing non-persist message if reached max concurrent-message limit
if (nonPersistentPendingMessages > maxNonPersistentPendingMessages) {
final long producerId = send.getProducerId();
final long sequenceId = send.getSequenceId();
final long highestSequenceId = send.getHighestSequenceId();
service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), SafeRun.safeRun(() -> {
commandSender.sendSendReceiptResponse(producerId, sequenceId, highestSequenceId, -1, -1);
}));
producer.recordMessageDrop(send.getNumMessages());
return;
} else {
//统计处理的消息数,用于限流
nonPersistentPendingMessages++;
}
}
//限流操作
startSendOperation(producer, headersAndPayload.readableBytes(), send.getNumMessages());
//....
// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker());
} else {
//核心方法
producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
send.getNumMessages(), send.isIsChunk(), send.isMarker());
}
}
ServerCnx#handleSend 方法主要做了限流相关操作,同时针对存储/非存储类型的 Topic 分配采用不同的处理方式,继续往下看 Producer#publishMessage 的实现
public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
boolean isChunked, boolean isMarker) {
if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize)) {
publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker);
}
}
private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked,
boolean isMarker) {
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, sequenceId, msgIn,
headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker));
}
Producer#publishMessage 仅做了一些校验就调用 publishMessageToTopic 方法,这个方法只调用了 PersistentTopic#publishMessag,继续往下看
public void publishMessage(ByteBuf headersAndPayload, PublishContext publishContext) {
//用于限流以及用于监控指标分析
pendingWriteOps.incrementAndGet();
if (isFenced) {
publishContext.completed(new TopicFencedException("fenced"), -1, -1);
decrementPendingWriteOpsAndCheck();
return;
}
if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
publishContext.completed(new NotAllowedException("Exceed maximum message size")
, -1, -1);
decrementPendingWriteOpsAndCheck();
return;
}
MessageDeduplication.MessageDupStatus status =
messageDeduplication.isDuplicate(publishContext, headersAndPayload);
switch (status) {
case NotDup:
//核心逻辑,触发消息持久化动作
asyncAddEntry(headersAndPayload, publishContext);
break;
case Dup:
// Immediately acknowledge duplicated message
publishContext.completed(null, -1, -1);
decrementPendingWriteOpsAndCheck();
break;
default:
publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1, -1);
decrementPendingWriteOpsAndCheck();
}
}
PersistentTopic#publishMessag 主要也是做一些校验,然后最核心的逻辑就是调用了 PersistentTopic#asyncAddEntry,继续往下看
private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) {
if (brokerService.isBrokerEntryMetadataEnabled()) {
ledger.asyncAddEntry(headersAndPayload,
(int) publishContext.getNumberOfMessages(), this, publishContext);
} else {
ledger.asyncAddEntry(headersAndPayload, this, publishContext);
}
}
ManagedLedgerImpl#asyncAddEntry
public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
}
// retain buffer in this thread
buffer.retain();
// Jump to specific thread to avoid contention from writers writing from different threads
executor.executeOrdered(name, safeRun(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx);
internalAsyncAddEntry(addOperation);
}));
}
ManagedLedgerImpl#internalAsyncAddEntry
private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
if (!beforeAddEntry(addOperation)) {
return;
}
//....
pendingAddEntries.add(addOperation);
if (state == State.ClosingLedger || state == State.CreatingLedger) {
// We don't have a ready ledger to write into
// We are waiting for a new ledger to be created
if (log.isDebugEnabled()) {
log.debug("[{}] Queue addEntry request", name);
}
if (State.CreatingLedger == state) {
long elapsedMs = System.currentTimeMillis() - this.lastLedgerCreationInitiationTimestamp;
if (elapsedMs > TimeUnit.SECONDS.toMillis(2 * config.getMetadataOperationsTimeoutSeconds())) {
log.info("[{}] Ledger creation was initiated {} ms ago but it never completed" +
" and creation timeout task didn't kick in as well. Force to fail the create ledger operation.",
name, elapsedMs);
this.createComplete(Code.TimeoutException, null, null);
}
}
} else if (state == State.ClosedLedger) {
// No ledger and no pending operations. Create a new ledger
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
log.info("[{}] Creating a new ledger", name);
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
mbean.startDataLedgerCreateOp();
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
}
} else {
checkArgument(state == State.LedgerOpened, "ledger=%s is not opened", state);
// Write into lastLedger
addOperation.setLedger(currentLedger);
++currentLedgerEntries;
currentLedgerSize += addOperation.data.readableBytes();
if (log.isDebugEnabled()) {
log.debug("[{}] Write into current ledger lh={} entries={}", name, currentLedger.getId(),
currentLedgerEntries);
}
if (currentLedgerIsFull()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Closing current ledger lh={}", name, currentLedger.getId());
}
// This entry will be the last added to current ledger
addOperation.setCloseWhenDone(true);
STATE_UPDATER.set(this, State.ClosingLedger);
}
//核心方法
addOperation.initiate();
}
}
OpAddEntry#initiate
public void initiate() {
if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) {
ByteBuf duplicateBuffer = data.retainedDuplicate();
// internally asyncAddEntry() will take the ownership of the buffer and release it at the end
addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
lastInitTime = System.nanoTime();
ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
} else {
log.warn("[{}] initiate with unexpected state {}, expect OPEN state.", ml.getName(), state);
}
}
public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) {
PendingAddOp op = PendingAddOp.create(this, clientCtx, getCurrentEnsemble(), data, writeFlags, cb, ctx);
doAsyncAddEntry(op);
}
protected void doAsyncAddEntry(final PendingAddOp op) {
if (throttler != null) {
throttler.acquire();
}
boolean wasClosed = false;
synchronized (this) {
// synchronized on this to ensure that
// the ledger isn't closed between checking and
// updating lastAddPushed
if (isHandleWritable()) {
long entryId = ++lastAddPushed;
long currentLedgerLength = addToLength(op.payload.readableBytes());
op.setEntryId(entryId);
op.setLedgerLength(currentLedgerLength);
pendingAddOps.add(op);
} else {
wasClosed = true;
}
}
if (wasClosed) {
// make sure the callback is triggered in main worker pool
try {
clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
op.cb.addCompleteWithLatency(BKException.Code.LedgerClosedException,
LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
}
@Override
public String toString() {
return String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId);
}
});
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
}
return;
}
DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(op.getEntryId());
try {
if (!waitForWritable(ws, 0, clientCtx.getConf().waitForWriteSetMs)) {
op.allowFailFastOnUnwritableChannel();
}
} finally {
ws.recycle();
}
try {
clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(
BookKeeper.getReturnRc(clientCtx.getBookieClient(), BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
}
}
以上就是 pulsar 数据写入的源码实现
总结
看完后是不是觉得 pulsar 也没有那么神秘了,也是由线程调度、读写文件、NIO 等这些构成的,所以说学好基础是非常有必要的。个人觉得底层的知识就像是 lego 积木一样,在面对不同的怪兽(场景)时我们的勇士(程序员)会通过这些积木构造成各式各样的武器来对付怪兽,对付怪兽的效果如何就取决于我们对 lego 积木(基础知识)的了解程度。同理 pulsar 也是一把由众多优秀的勇士打造的一把宝剑,我们要熟悉它用好它,更重要的是我们要知道它是如何铸造的,因为这样咱们终有一天能铸造出一把更强的武器来解决我们所遇到的恶龙~
参考文献
版权声明: 本文为 InfoQ 作者【少年游侠客】的原创文章。
原文链接:【http://xie.infoq.cn/article/96083c2206468084a5ec0640d】。文章转载请联系作者。
少年游侠客
还未添加个人签名 2019-07-29 加入
还未添加个人简介
评论