ES 操作之批量写 -BulkProcessor 原理浅析
[TOC]
最近对线上业务进行重构,涉及到ES同步这一块,在重构过程中,为了ES 写入 性能考虑,大量的采取了 bulk的方式,来保证整体的一个同步速率,针对BulkProcessor 来深入一下,了解下 是如何实现,基于请求数,请求数据量大小 和 固定时间,刷新写入ES 的原理针对ES 批量写入, 提供了3种方式,在 high-rest-client 中分别是 bulk bulkAsync bulkProcessor 3种方式。本文主要针对 bulkProcessor 来进行一些讲述
BulkProcessor
官方介绍
BulkProcessor是一个线程安全的批量处理类,允许方便地设置 刷新 一个新的批量请求 (基于数量的动作,根据大小,或时间),容易控制并发批量的数量请求允许并行执行。
创建流程
How To use ?
来看个demo 创建BulkProcessor
@Bean(name = "bulkProcessor") // 可以封装为一个bean,非常方便其余地方来进行 写入 操作 public BulkProcessor bulkProcessor(){ BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> Es6XServiceImpl.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener); return BulkProcessor.builder(bulkConsumer, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { // todo do something int i = request.numberOfActions(); log.error("ES 同步数量{}",i); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // todo do something Iterator<BulkItemResponse> iterator = response.iterator(); while (iterator.hasNext()){ System.out.println(JSON.toJSONString(iterator.next())); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // todo do something log.error("写入ES 重新消费"); } }).setBulkActions(1000) // 达到刷新的条数 .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB)) // 达到 刷新的大小 .setFlushInterval(TimeValue.timeValueSeconds(5)) // 固定刷新的时间频率 .setConcurrentRequests(1) //并发线程数 .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) // 重试补偿策略 .build(); }
使用BulkProcessor
bulkProcessor.add(xxxRequest)
创建过程做了些什么?
创建一个consumer 对象用来封装传递参数,和请求操作
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> Es6XServiceImpl.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
我们可以看到用了java 8的函数式编程接口 BiConsumer 关于 BiConsumer 的用法,可以自行百度,因为也是采取的 异步刷新策略, 所以,是一个返回结果的Listener ActionListener<BulkResponse>
构建并BulkProcess
return BulkProcessor.builder(bulkConsumer, new BulkProcessor.Listener() { **** }).setBulkActions(1000) .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); }
可以很清楚的看到,在build 操作中,我们看到,在build 中,除了 之前定义的consumer,还实现了一个 Listener 接口 (稍后会具体讲到),用来做一些 在批量求情之前和请求之后的处理。
至此为止,BulkProcessor 创建,就OK啦~。
内部逻辑实现
先不说话,我们先上张类图
可以看到,在 BulkProcessor 中,有这样的一些类和接口
ListenerBuilderBulkProcessorFlush===== 华丽的分界线BulkProcessor 实现了 Closeable --> 继承自 AutoCloseable (关于AutoCloseable 本文不做过多说明,具体的可以百度,或者等待后续)
那么先从构建开始,我们来看下Builder
/*** 简单的构建,可以看到,就是一个client 和 listener 这个不会做刷新策略,*/public static Builder builder(Client client, Listener listener) { Objects.requireNonNull(client, "client"); Objects.requireNonNull(listener, "listener"); return new Builder(client::bulk, listener, client.threadPool(), () -> {}); }/*** 所有功能的builder 实现方法* ScheduledThreadPoolExecutor 用来实现 按照时间频率,来进行 刷新,如 每5s **/ public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) { Objects.requireNonNull(consumer, "consumer"); Objects.requireNonNull(listener, "listener"); final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); // 接口静态方式,来实现 Executor 的初始化 return new Builder(consumer, listener, (delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS), // () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS)); }/*** 构造函数 * @param consumer 前文定义的consumer request response action* @param listener listener BulkProcessor 内置监听器* @param scheduler elastic 定时调度 类scheduler * @paran onClose 关闭时候的运行*/ private Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener, Scheduler scheduler, Runnable onClose) { this.consumer = consumer; this.listener = listener; this.scheduler = scheduler; this.onClose = onClose; }
通过上述的代码片段,可以很明显的看到,关于初始化构建的一些关键点和要素
看完builder 接下来,我们看下 bulkprocessor 是如何工作的
先看下构造方法
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, Scheduler scheduler, Runnable onClose) { this.bulkActions = bulkActions; this.bulkSize = bulkSize.getBytes(); this.bulkRequest = new BulkRequest(); this.scheduler = scheduler; this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests); // BulkRequestHandler 批量执行 handler 操作 // Start period flushing task after everything is setup this.cancellableFlushTask = startFlushTask(flushInterval, scheduler); //开始刷新任务 this.onClose = onClose; }
startFlushTask 如何进行工作
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) { // 如果 按照时间刷新 为空,则直接返回 任务为取消状态 if (flushInterval == null) { return new Scheduler.Cancellable() { @Override public void cancel() {} @Override public boolean isCancelled() { return true; } }; } final Runnable flushRunnable = scheduler.preserveContext(new Flush()); return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC); } private void executeIfNeeded() { ensureOpen(); if (!isOverTheLimit()) { return; } execute(); }// 刷新线程class Flush implements Runnable { @Override public void run() { synchronized (BulkProcessor.this) { if (closed) { return; } if (bulkRequest.numberOfActions() == 0) { return; } execute(); // 下面方法 } } }/*** 刷新执行**/ // (currently) needs to be executed under a lock private void execute() { final BulkRequest bulkRequest = this.bulkRequest; final long executionId = executionIdGen.incrementAndGet(); // 刷新 bulkRequest 为下一批做准备 this.bulkRequest = new BulkRequest(); this.bulkRequestHandler.execute(bulkRequest, executionId); }
看到这里,关于时间的定时调度,我们其实是很清楚了,那么 关于数据量 和 大小的判断策略在哪儿?
/*** 各种添加操作*/public BulkProcessor add(DocWriteRequest request, @Nullable Object payload) { internalAdd(request, payload); return this; } /** * 我们可以看到,在添加之后,会做一个操作 * executeIfNeeded 如果需要,则进行执行 */ private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) { ensureOpen(); bulkRequest.add(request, payload); executeIfNeeded(); }/*** 如果超过限制,则执行刷新操作*/ private void executeIfNeeded() { ensureOpen(); if (!isOverTheLimit()) { return; } execute(); } /** * 这这儿,我们终于看到了 关于action 和 size 的判断操作, * */ private boolean isOverTheLimit() { if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) { return true; } if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) { return true; } return false; }
**通过上述的分析,关于按照时间,数据size,大小来进行flush 执行的入口我们都已经很清楚了
**
针对数据大小的设置。在每次添加的时候,做判断是否 超过限制针对 时间的频次控制,交由ScheduledThreadPoolExecutor 来去做监控
下来,让我们看下具体的执行以及重试策略,和 返回值的处理
public void execute(BulkRequest bulkRequest, long executionId) { Runnable toRelease = () -> {}; boolean bulkRequestSetupSuccessful = false; try { // listener 填充 request 和执行ID listener.beforeBulk(executionId, bulkRequest); //通过信号量来进行资源的控制 来自于我们设置的 setConcurrentRequests semaphore.acquire(); toRelease = semaphore::release; CountDownLatch latch = new CountDownLatch(1); // 进行执行并按照补偿重试策略如果失败 retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() { //结果写入 ActionListener --> BulkProcessor.Listener 的转换 @Override public void onResponse(BulkResponse response) { try { listener.afterBulk(executionId, bulkRequest, response); } finally { semaphore.release(); latch.countDown(); } } @Override public void onFailure(Exception e) { try { listener.afterBulk(executionId, bulkRequest, e); } finally { semaphore.release(); latch.countDown(); } } }, Settings.EMPTY); bulkRequestSetupSuccessful = true; if (concurrentRequests == 0) { latch.await(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } finally { if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore toRelease.run(); } } }
最终的执行,在 RetryHandler 中,继续往下看
public void execute(BulkRequest bulkRequest) { this.currentBulkRequest = bulkRequest; consumer.accept(bulkRequest, this); }
对,没错,只有一个操作, consumer.accept(bulkRequest, this);
再一次展现了 java 8 函数式接口的功能强大之处 此处 consumer.accept(bulkRequest, this);
执行的操作即 Es6XServiceImpl.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
如何设置重试策略,以及数据的筛选
@Override public void onResponse(BulkResponse bulkItemResponses) { if (!bulkItemResponses.hasFailures()) { // we're done here, include all responses addResponses(bulkItemResponses, (r -> true)); finishHim(); } else { if (canRetry(bulkItemResponses)) { addResponses(bulkItemResponses, (r -> !r.isFailed())); retry(createBulkRequestForRetry(bulkItemResponses)); } else { addResponses(bulkItemResponses, (r -> true)); finishHim(); } } }/*** 只针对失败的请求,放入到重试策略中*/ private void addResponses(BulkResponse response, Predicate<BulkItemResponse> filter) { for (BulkItemResponse bulkItemResponse : response) { if (filter.test(bulkItemResponse)) { // Use client-side lock here to avoid visibility issues. This method may be called multiple times // (based on how many retries we have to issue) and relying that the response handling code will be // scheduled on the same thread is fragile. synchronized (responses) { responses.add(bulkItemResponse); } } } }
再一次展示了函数式接口的强大之处
补充一张流程图
对这次的代码一些了解,对其中的一些设计理念和模式,学习到了不少的知识
>
如consumer, Listener,模式的应用。
如consumer ,Predicate 等函数式接口的用法。
如对线程的统一封装,来去做更统一的抽象处理
>
在后续的代码设计中,也会尝试采取这种设计模式,来写出更优雅的代码。es java api的代码内的不少设计模式,自我感觉还是很nice,后续会继续学习
仅代表个人见解,如有不对之出,欢迎指出
版权声明: 本文为 InfoQ 作者【PCMD】的原创文章。
原文链接:【http://xie.infoq.cn/article/64059296ef803dbf2c054bc03】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
PCMD
我看青山多妩媚 2020.02.07 加入
还未添加个人简介
评论