大数据培训 flink 中核心设计、抽象和线程模型
以下文章来源于 Flink
单线程:
这段代码片段将只能同时处理一个连接,要管理多个并发客户端,需要为每个新的客户端 Socket 创建一个新的 Thread。
多线程:
Java NIO:
Java NIO 很早就提供了非阻塞调用,可以使用操作系统的事件通知 API 注册一组非阻塞套接字,以确定它们中是否有任何的套接字已经有数据可供读写。(也称为 I/O 多路复用,该接口从最初的 select 和 poll 调用到更加高性能的实现 epoll)
java.nio.channels.Selector 是 Java 的非阻塞 I/O 实现的关键。它使用了事件通知的方式以确定在一组非阻塞套接字中有哪些已经就绪能够进行 I/O 相关的操作。因为可以在任何的时间检查任意的读操作或者写操作的完成状态
Netty 是一个异步、事件驱动的网络编程框架,为了快速发展的可维护、高性能的 C/S 协议。
利用 Java 的高级网络功能,隐藏其背后的复杂性而提供一个易于使用的客户端/服务器框架。
核心设计:异步事件驱动
思想:
Netty 所有的 I/O 操作都是异步。因为一个操作可能无法立即返回,我们需要有一种方法在以后确定它的结果。出于这个目的,Netty 提供了接口 ChannelFuture。ChannelFuture 的 addListener 方法注册了一个 ChannelFutureListener,当操作完成时,ChannelFutureListener 可以通过 回调 的方式被异步通知_大数据培训。
阐述:
主调线程执行非阻塞方法,方法立即返回一个 Future。然后在这个 Futrue 上注册一个 Listener。主调线程便可以继续执行其他事情了。
当这个非阻塞方法真正执行完毕,会引起一些状态的变化,状态变化触发 Futrue 的一个特定事件,这个事件会使得通知注册的 Listener,然后包装 Listener 中的 operationComplete 方法为一个 Runable,扔到线程池里去执行这个函数。
源码层面的理解:
Futrue 本质是一个被观察者,Listener 是观察者,Futrue 上注册若干 Listener,当发生特定 Event,会触发 Futrue 上特定的 Listener。被观察者 Future 调用观察者 Listener 的 operationComplete 方法实现时间通知和未来逻辑的执行。
private void notifyListeners() {
// EventExecutor 就是一个线程池
EventExecutor executor = executor();
//如果当前线程是 EventExecutor 中的线程,直接执行
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
//如果当前线程不是 EventExecutor 中的线程,则放入 EventExecutor 中执行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
EventExecutor 本质就是一个线程池:
EventExecutor 其实就是一个只有一个 Thread 的线程池,包含在线程池组中 EventExecutorGroup。netty 的 Futrue 继承了 jdk 的 Futrue,netty 的 EventListener 继承了 jdk 的 EventListener,通过 EventListener 的 operationComplete 回调实现异步事件通知的原理是:
一个 Future 代表一个 Channel 的某个执行结果,可以添加若干个 Listener,当有了执行结果,会发出相应事件,事件触发 Future 去通知注册在相应事件下的若干 Listener,并且如果当前线程如果就是 Channel 的这个 EventExecutor(调用 inEventloop 可以得知),则立即执行,否则放入 EventExecutor 的任务队列等待执行。也就是说 Listener 的异步逻辑还是在 Channel 对应的那个 EventExecutor 的线程中执行的。这是 Netty 的异步事件驱动的源码层理解。
核心抽象——Bootstrap
Bootstrap(启动 socket)
一个引导程序,引导 Channel 供使用。Netty 有两种类型的引导:
客户端(Bootstrap)
服务端(ServerBootstrap)
核心抽象——事件循环组(线程模型)
1. EventExecutorGroup
public interface EventExecutorGroup
extends ScheduledExecutorService, Iterable<EventExecutor>
事件执行器组:通过 next()提供下一个 EventExecutor。除此之外,还负责 EventExecutor 的生命周期,例如全局性的关闭。
2. EventExecutor
public interface EventExecutor extends EventExecutorGroup
事件执行器:是一个特殊的时间执行器组,提供了判断一个线程是否属于这个事件循环。除此之外它还扩展了事件执行器组,允许以一种通用的方式访问。
3. EventLoopGroup
public interface EventLoopGroup extends EventExecutorGroup
事件循环组:一个 EventLoopGroup 包含若干 EventLoop,允许注册 Channel。提供了一种迭代,用于在事件循环中检索下一个要处理的 Channel。
4. EventLoop
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup
事件循环:一个 EventLoop 被注册了若干 Channel,并处理这些 Channel 的所有 IO 事件。
一个 EventLoopGroup 包含若干 EventLoop
一个 EventLoop 在其生命周期内只能和一个 Thread 绑定,EventLoop 处理的 I/O 事件都由它绑定的 Thread 处理
核心抽象——Future(异步通知)
1. Future
public interface Future<V> extends java.util.concurrent.Future<V>
异步操作的结果。Future 提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。JDK 预置了 interface java.util.concurrent.Future,但是其所提供的实现,只允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。
2. ChannelFuture
public interface ChannelFuture extends Future<Void>
Netty 所有的 I/O 操作都是异步。因为一个操作可能无法立即返回,我们需要有一种方法在以后确定它的结果。出于这个目的,Netty 提供了接口 ChannelFuture。ChannelFuture 不是未完成就是已完成。它的 addListener 方法注册了一个 ChannelFutureListener,当操作完成时,可以被异步通知(不管成功与否)。
{@code @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.channel().close();
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// Perform post-closure operation
// ...
);
3. ChannelFutureListener
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture>
监听一个 ChannelFuture 的结果。通过调用 addListener(GenericFutureListener)添加一个 Listener,这个异步的 IO 操作结果将会被通知一次这个 Listener。
迅速返回调用者控制权:operationComplete(Future)会直接被一个 IO 线程调用。因此,在 IO 期间,执行一个耗时的或者阻塞的 operationComplete 会发生意料之外的事情。如果需要执行一个耗时的操作请在一个不同的线程池里执行。
核心抽象——Channel(通道)
与网络套接字或能够进行 I/O 操作(例如读取,写入,连接和绑定)的组件。与 Channel 相关的概念有以下四个:
Channel,表示一个连接,可以理解为每一个请求,就是一个 Channel。
ChannelHandler,核心处理业务就在这里,用于处理业务请求。
ChannelHandlerContext,用于传输业务数据。
ChannelPipeline,用于保存处理过程需要用到的 ChannelHandler 和 ChannelHandlerContext。
核心抽象——ChannelHandler(处理程序)
1. ChannelHandler
处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline 中的下一个处理程序。
2. Inbound/Outbound 子类
ChannelHandler 本身不提供许多方法,但是通常必须实现其子类型之一:
ChannelInboundHandler 处理入站 I/O 事件。在状态改变上添加回调函数。在状态改变时调用用户添加的钩子函数。
ChannelOutboundHandler 处理出站 I/O 操作。会得到 IO 出站操作的通知。
3. ChannelHandlerContext
ChannelHandler 随 ChannelHandlerContext 对象一起提供。ChannelHandler 应该通过上下文对象与其所属的 ChannelPipeline 进行交互。使用上下文对象,ChannelHandler 可以在上游或下游传递事件,动态修改管道或存储特定 Handler 处理程序的信息(使用 AttributeKeys)。
核心抽象——ChannelPipline(管道)
ChannelPipline 是用于存放 ChannelHandler 链的容器。一个关于 handler 的表,这些 handler 处理或者拦截一个 Channel 上的入站事件和出站操作。
核心抽象——ByteBuf(字节容器)
ByteBuf 是一个存储字节的容器,最大特点就是使用方便,它既有自己的读索引和写索引,方便你对整段字节缓存进行读写,也支持 get/set,方便你对其中每一个字节进行读写,他的数据结构如下图所示:
他有三种使用模式:
1)Heap Buffer 堆缓冲区
堆缓冲区是 ByteBuf 最常用的模式,他将数据存储在堆空间。
2)Direct Buffer 直接缓冲区
直接缓冲区是 ByteBuf 的另外一种常用模式,他的内存分配都不发生在堆,jdk1.4 引入的 nio 的 ByteBuffer 类允许 jvm 通过本地方法调用分配内存,这样做有两个好处
a) 通过免去中间交换的内存拷贝, 提升 IO 处理速度; 直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外。
b) DirectBuffer 在 -XX:MaxDirectMemorySize=xxM 大小限制下, 使用 Heap 之外的内存, GC 对此”无能为力”,也就意味着规避了在高负载下频繁的 GC 过程对应用线程的中断影响.
3)Composite Buffer 复合缓冲区
复合缓冲区相当于多个不同 ByteBuf 的视图,这是 netty 提供的,jdk 不提供这样的功能。
关系
一个 EventLoopGroup 包含若干 EventLoop
一个 EventLoop 在其生命周期内只能和一个 Thread 绑定,EventLoop 处理的 I/O 事件都由它绑定的 Thread 处理
一个 Channel 在其生命周期内,只能注册于一个 EventLoop,一个 EventLoop 可能被分配处理多个 Channel。也就是 EventLoop 与 Channel 是 1 : n 的关系
一个 Channel 上的所有 ChannelHandler 的事件由绑定的 EventLoop 中的 I/O 线程处理
不要阻塞 Channel 的 I/O 线程,可能会影响该 EventLoop 中其他 Channel 事件处理
线程模型
运行任务来处理在连接的生命周期内发生的事件是任何网络框架的基本功能。与之相应的编程上的构造通常被称为事件循环,Netty 使用了 io.netty.channel.EventLoop 来抽象。
Netty 的 EventLoop 是协同设计的一部分,它采用了两个基本的 API:并发和网络编程。首先,io.netty.util.concurrent 包构建在 JDK 的 java.util.concurrent 包上,用来提供线程执行器。其次,io.netty.channel 包中的类,为了与 Channel 的事件进行交互,扩展了这些接口/类。
在这个模型中,一个 EventLoop 将由一个永远都不会改变的 Thread 驱动,同时任务(Runnable 或者 Callable)可以直接提交给 EventLoop 实现,以立即执行或者调度执行。根据配置和可用核心的不同,可能会创建多个 EventLoop 实例用以优化资源的使用,并且单个 EventLoop 可能会被指派用于服务多个 Channel。
Netty 线程模型的卓越性能取决于对于当前执行的 Thread 的身份的确定(通过调用 EventLoop 的 inEventLoop(Thread)方法实现),也就是说,确定它是否是分配给当前 Channel 以及它的 EventLoop 的那一个线程。
如果(当前)调用线程正是支撑 EventLoop 的线程,那么所提交的代码块将会被(直接)执行。否则,EventLoop 将调度该任务以便稍后执行,并将它放入到内部队列中。当 EventLoop 下次处理它的事件时,它会执行队列中的那些任务/事件。这也就解释了任何的 Thread 是如何与 Channel 直接交互而无需在 ChannelHandler 中进行额外同步的。注意,每个 EventLoop 都有它自已的任务队列,独立于任何其他的 EventLoop。
异步传输实现只使用了少量的 EventLoop(以及和它们相关联的 Thread),而且在当前的线程模型中,它们可能会被多个 Channel 所共享。这使得可以通过尽可能少量的 Thread 来支撑大量的 Channel,而不是每个 Channel 分配一个 Thread。
EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。在当前实现中,使用顺序循环(round-robin)的方式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel。
一旦一个 Channel 被分配给一个 EventLoop,它将在它的整个生命周期中都使用这个 EventLoop(以及相关联的 Thread)。这可以使你从担忧你的 ChannelHandler 实现中的线程安全和同步问题中解脱出来。
评论