Presto 基于 Airlift 构建的分布式 SQL 查询引擎,在 Presto 中 Airlift 起着举足轻重的作用,如果 Presto 是座大厦,那么 Airlift 就是大厦的地基。
Airlift 是一个可配置的、开箱即用的工具包,帮助开发者提升工作效率,将精力专注于业务实现。Airlift 提供的工具众多,今天会从基础的 configuration、 bootstrap 和 concurrent 开始介绍。
1. configuration
通过注解的方式,将配置文件中的数据绑定到配置类中,Presto 节点启动时就是通过 configuration 模块加载 etc 目录下的 config.properties 文件,获取节点配置信息。
这里将 mysql 的连接信息 配置在文件 config.properties 中:
首先定义配置类,使用注解 @Config 绑定配置文件的属性:
package com.airlift;
import io.airlift.configuration.Config;
public class MySQLConfig {
private String url;
private String user;
private String password;
private String options;
public String getUrl() {
return url;
}
@Config("url")
public MySQLConfig setUrl(String url) {
this.url = url;
return this;
}
public String getUser() {
return user;
}
@Config("user")
public MySQLConfig setUser(String user) {
this.user = user;
return this;
}
public String getPassword() {
return password;
}
@Config("password")
public MySQLConfig setPassword(String password) {
this.password = password;
return this;
}
public String getOptions() {
return options;
}
@Config("options")
public MySQLConfig setOptions(String options) {
this.options = options;
return this;
}
@Override
public String toString() {
return "MySQLConfig{" +
"url='" + url + '\'' +
", user='" + user + '\'' +
", password='" + password + '\'' +
", options='" + options + '\'' +
'}';
}
}
复制代码
编写 guice moudle 创建绑定关系:
class MySQLModule implements Module {
@Override
public void configure(Binder binder) {
// 需要使用 ConfigBinder
ConfigBinder configBinder = ConfigBinder.configBinder(binder);
configBinder.bindConfig(MySQLConfig.class);
}
}
复制代码
实际集成:
public class AirListConfigTest {
public static void main(String[] args) throws Exception {
// 1. 模拟 jvm 启动的附加参数 -Dconfig=config.properties
System.setProperty("config", "config.properties");
// 2. 使用工具类 ConfigurationLoader 加载配置
Map<String, String> properties = ConfigurationLoader.loadProperties();
ConfigurationFactory configurationFactory = new ConfigurationFactory(properties);
// 3. 创建 module
MySQLModule module = new MySQLModule();
configurationFactory.registerConfigurationClasses(ImmutableList.of(module));
// 4. 创建 Injector 获取配置对象
Injector injector = Guice.createInjector(new ConfigurationModule(configurationFactory), module);
MySQLConfig config = injector.getInstance(MySQLConfig.class);
System.out.println(config);
}
}
复制代码
输出结果:
2. bootstrap
对象生命周期管理,在 Presto 中用于 Hive、Druid、Hudi 和 Kudu 连接器的生命周期管理,主要目的就是在对象初始化和销毁时执行特定的处理逻辑,注解 @PostConstruct 标识初始化方法,@PreDestroy 标识销毁算法。
class HelloAirLift{
private static final Logger LOGGER = Logger.get(HelloAirLift.class);
@PostConstruct
public void init(){
LOGGER.info("Opened channel");
}
@PreDestroy
public void close(){
LOGGER.info("Closed channel");
}
public void say(){
LOGGER.info("Hello AriLift");
}
}
复制代码
创建 module 建立绑定:
class CustomModule implements Module{
@Override
public void configure(Binder binder) {
binder.bind(HelloAirLift.class).in(Singleton.class);
}
}
复制代码
实际集成:
public class AirLiftStart {
public static void main(String[] args) throws Exception{
Injector injector = Guice.createInjector(new LifeCycleModule(), new CustomModule());
// 开始执行标识 @PostConstruct 的方法
LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
lifeCycleManager.start();
HelloAirLift helloAirlift = injector.getInstance(HelloAirLift.class);
helloAirlift.say();
// 开始执行标识 @PreDestroy 的方法
lifeCycleManager.stop();
}
}
复制代码
输出结果:
3. concurrent
本质上是对通过 Executors 创建出来的线程池进行了容错处理,同时还提供了线程安全的缓存工具类。
这里主要介绍 BoundedExecutor 线程池,在 Presto 中出场次数最多,主要作用是让线程数量在一个合理的、可伸缩的安全范围之内,内部实现原理:
明确设置线程的最大数量,消除 Executors.newCachedThreadPool 中最大线程数等于 Integer.MAX_VALUE 的风险;
内部维护 ConcurrentLinkedQueue 缓冲队列;
任务会直接放入缓冲队列中,如果队列中任务数未达到预设的最大数量,执行队列首个任务,执行完毕首位移除;如果已经等于或大于最大数量直接退出。
额外介绍 ThreadLocalCache 线程安全的缓存工具类,该类保证缓存的永远是最新数据,使用时需要指定缓存数量和键对应值的获取逻辑:
class ThreadLocalCacheTest{
public static void main(String[] args) throws Exception{
// 每个线程最大可缓存 10 个键
int maxPerSize = 10;
// 键对应值的生成逻辑
Function<Integer, String> loader = key -> "value_" + key;
ThreadLocalCache<Integer, String> cache = new ThreadLocalCache<>(maxPerSize, loader);
/**
* ThreadLocalCache 类里面有两个属性
* 1. ThreadLocal<Map<K, V>> cache
* 2. Function<K, V> loader;
*/
Field field = cache.getClass().getDeclaredField("cache");
field.setAccessible(true);
ThreadLocal threadLocal = (ThreadLocal)field.get(cache);
Map map = (Map)threadLocal.get();
int i = 0;
while (true){
System.out.printf("key=%s, value=%s\n",i , cache.get(i));
System.out.println(map);
TimeUnit.SECONDS.sleep(1);
i++;
}
}
}
复制代码
评论