写点什么

Presto 设计与实现(五):自动配置

作者:冰心的小屋
  • 2023-08-21
    北京
  • 本文字数:2946 字

    阅读完需:约 10 分钟

Presto 设计与实现(五):自动配置

Presto 基于 Airlift 构建的分布式 SQL 查询引擎,在 Presto 中 Airlift 起着举足轻重的作用,如果 Presto 是座大厦,那么 Airlift 就是大厦的地基。


Airlift 是一个可配置的、开箱即用的工具包,帮助开发者提升工作效率,将精力专注于业务实现。Airlift 提供的工具众多,今天会从基础的 configuration、 bootstrap 和 concurrent 开始介绍。

1. configuration

通过注解的方式,将配置文件中的数据绑定到配置类中,Presto 节点启动时就是通过 configuration 模块加载 etc 目录下的 config.properties 文件,获取节点配置信息。


这里将 mysql 的连接信息 配置在文件 config.properties 中:


首先定义配置类,使用注解 @Config 绑定配置文件的属性:

package com.airlift;
import io.airlift.configuration.Config;
public class MySQLConfig { private String url; private String user; private String password; private String options;
public String getUrl() { return url; }
@Config("url") public MySQLConfig setUrl(String url) { this.url = url; return this; }
public String getUser() { return user; }
@Config("user") public MySQLConfig setUser(String user) { this.user = user; return this; }
public String getPassword() { return password; }
@Config("password") public MySQLConfig setPassword(String password) { this.password = password; return this; }
public String getOptions() { return options; }
@Config("options") public MySQLConfig setOptions(String options) { this.options = options; return this; }
@Override public String toString() { return "MySQLConfig{" + "url='" + url + '\'' + ", user='" + user + '\'' + ", password='" + password + '\'' + ", options='" + options + '\'' + '}'; }}
复制代码


编写 guice moudle 创建绑定关系:

class MySQLModule implements Module {    @Override    public void configure(Binder binder) {        // 需要使用 ConfigBinder        ConfigBinder configBinder = ConfigBinder.configBinder(binder);        configBinder.bindConfig(MySQLConfig.class);    }}
复制代码


实际集成:

public class AirListConfigTest {    public static void main(String[] args) throws Exception {        // 1. 模拟 jvm 启动的附加参数 -Dconfig=config.properties        System.setProperty("config", "config.properties");
// 2. 使用工具类 ConfigurationLoader 加载配置 Map<String, String> properties = ConfigurationLoader.loadProperties(); ConfigurationFactory configurationFactory = new ConfigurationFactory(properties);
// 3. 创建 module MySQLModule module = new MySQLModule(); configurationFactory.registerConfigurationClasses(ImmutableList.of(module)); // 4. 创建 Injector 获取配置对象 Injector injector = Guice.createInjector(new ConfigurationModule(configurationFactory), module); MySQLConfig config = injector.getInstance(MySQLConfig.class); System.out.println(config); }}
复制代码

输出结果:

2. bootstrap

对象生命周期管理,在 Presto 中用于 Hive、Druid、Hudi 和 Kudu 连接器的生命周期管理,主要目的就是在对象初始化和销毁时执行特定的处理逻辑,注解 @PostConstruct 标识初始化方法,@PreDestroy 标识销毁算法。


class HelloAirLift{    private static final Logger LOGGER = Logger.get(HelloAirLift.class);
@PostConstruct public void init(){ LOGGER.info("Opened channel"); }
@PreDestroy public void close(){ LOGGER.info("Closed channel"); }
public void say(){ LOGGER.info("Hello AriLift"); }}
复制代码


创建 module 建立绑定:

class CustomModule implements Module{    @Override    public void configure(Binder binder) {        binder.bind(HelloAirLift.class).in(Singleton.class);    }}
复制代码


实际集成:

public class AirLiftStart {    public static void main(String[] args) throws Exception{        Injector injector = Guice.createInjector(new LifeCycleModule(), new CustomModule());
// 开始执行标识 @PostConstruct 的方法 LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class); lifeCycleManager.start();
HelloAirLift helloAirlift = injector.getInstance(HelloAirLift.class); helloAirlift.say();
// 开始执行标识 @PreDestroy 的方法 lifeCycleManager.stop(); }}
复制代码


输出结果:

3. concurrent

本质上是对通过 Executors 创建出来的线程池进行了容错处理,同时还提供了线程安全的缓存工具类。


这里主要介绍 BoundedExecutor 线程池,在 Presto 中出场次数最多,主要作用是让线程数量在一个合理的、可伸缩的安全范围之内,内部实现原理:

  • 明确设置线程的最大数量,消除 Executors.newCachedThreadPool 中最大线程数等于 Integer.MAX_VALUE 的风险;

  • 内部维护 ConcurrentLinkedQueue 缓冲队列;

  • 任务会直接放入缓冲队列中,如果队列中任务数未达到预设的最大数量,执行队列首个任务,执行完毕首位移除;如果已经等于或大于最大数量直接退出。


额外介绍 ThreadLocalCache 线程安全的缓存工具类,该类保证缓存的永远是最新数据,使用时需要指定缓存数量和键对应值的获取逻辑:

class ThreadLocalCacheTest{    public static void main(String[] args) throws Exception{        // 每个线程最大可缓存 10 个键        int maxPerSize = 10;        // 键对应值的生成逻辑        Function<Integer, String> loader = key -> "value_" + key;        ThreadLocalCache<Integer, String> cache = new ThreadLocalCache<>(maxPerSize,  loader);
/** * ThreadLocalCache 类里面有两个属性 * 1. ThreadLocal<Map<K, V>> cache * 2. Function<K, V> loader; */ Field field = cache.getClass().getDeclaredField("cache"); field.setAccessible(true); ThreadLocal threadLocal = (ThreadLocal)field.get(cache); Map map = (Map)threadLocal.get();
int i = 0; while (true){ System.out.printf("key=%s, value=%s\n",i , cache.get(i)); System.out.println(map); TimeUnit.SECONDS.sleep(1); i++; } }}
复制代码


发布于: 刚刚阅读数: 3
用户头像

分享技术上的点滴收获! 2013-08-06 加入

一杯咖啡,一首老歌,一段代码,欢迎做客冰屋,享受编码和技术带来的快乐!

评论

发布
暂无评论
Presto 设计与实现(五):自动配置_数据湖_冰心的小屋_InfoQ写作社区