k8s 上运行我们的 springboot 服务之——flume 同步数据到到 clickHouse

clickHouse的简单介绍,详细介绍请查看官网或者百度
1)clickhouse非hadoop体系
2)使用sql语句,对于熟悉关系数据的人员入门相对简单
3)clickhouse最好用来读,不要用来变更,写用批量的方式
4)各种日志数据我们可以用flume同步到clickhouse来统一管理和做用户行为分析
5)mysql 增量同步到clickhouse,这里有一个思考:系统日志,交易日志,用户行为日志,已生成订单等不变的数据似乎可以同步到clickhouse来做报表、统计、数据分析等。
由于用户经常查询和操作一般都是最近的或者最新的数据,可以把这部分变更的有事务要求的数据放到mysql中。把mysql数据同步到clickhouse,近期最新和变更的数据在mysql中操作,其他大部分数据在clickhouse中操作,这样来减轻关系型数据库的性能瓶颈。
6)面对错综复杂的数据源我们似乎可以使用flink来把数据统一归集到clickhouse
以上都需要根据实际情况去测试使用。毕竟实践是检验真理的唯一标准
flume的简单介绍,详细介绍请查看官网或者百度
1)高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统
2)支持监听多种方式多种类型的文件或者文件目录数据的变更,以获得变更的数据并把这部分数据推送到不同的数据接收中间件
3)flume提供了多种插件来完成2中的需求,常用的例如:监听TCP的端口做为数据源,监听目录下日志文件的变更等
4)flume可以把变更的数据同步队列中,然后队列把数据分发到我们各种数据仓库中间件中,也可以不通过队列直接把数据同步存储到数据仓库中间件中
5)我们也可以自定义flume的ng来满足我们自己特殊的数据变更同步需求
本文主要讲解内容如下:
1、自定义实现flume ng
2、测试我们系统生成的日志通过1中实现的flume ng同步到 clickhouse
下面是核心代码:
1)在clickhouse中创建表:
CREATE TABLE default.sys_log ( `id` String, `sys_name` String, `level` String, `msg` String, `thread` String, `create_date` DateTime, `exe_date` DateTime) ENGINE = MergeTree()PARTITION BY toYYYYMM(create_date)ORDER BY exe_date;
2)自定义flume sink pom.xml
  <dependencies>        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->        <dependency>            <groupId>org.apache.flume</groupId>            <artifactId>flume-ng-core</artifactId>        </dependency>        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>fastjson</artifactId>        </dependency>        <dependency>            <groupId>com.google.guava</groupId>            <artifactId>guava</artifactId>        </dependency>        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-configuration -->        <dependency>            <groupId>org.apache.flume</groupId>            <artifactId>flume-ng-configuration</artifactId>        </dependency>        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->        <dependency>            <groupId>org.apache.flume</groupId>            <artifactId>flume-ng-sdk</artifactId>        </dependency>        <dependency>            <groupId>com.opencsv</groupId>            <artifactId>opencsv</artifactId>            <version>4.2</version>        </dependency>        <dependency>            <groupId>ru.yandex.clickhouse</groupId>            <artifactId>clickhouse-jdbc</artifactId>            <version>0.2.4</version>        </dependency>        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>        </dependency>    </dependencies>
3) flume 自定义sink 核心类
 private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSink.class);    private BalancedClickhouseDataSource dataSource = null;    private SinkCounter sinkCounter = null;    private String host = null;    private String port = null;    private String user = null;    private String password = null;    private String database = null;    private String table = null;    private int batchSize;    @Override    public Status process() throws EventDeliveryException {        Status status = null;        Channel ch = getChannel();        Transaction txn = ch.getTransaction();        txn.begin();        List<LogLogbackVo> insertData = new ArrayList<>();        try {            ClickHouseConnectionImpl conn = (ClickHouseConnectionImpl) dataSource.getConnection();            int count;            for (count = 0; count < batchSize; ++count) {                Event event = ch.take();                if (event == null) {                    break;                }                insertData.add(StringUtil.buildLog(new String(event.getBody())));            }            if (count <= 0) {                sinkCounter.incrementBatchEmptyCount();                txn.commit();                return Status.BACKOFF;            } else if (count < batchSize) {                sinkCounter.incrementBatchUnderflowCount();            } else {                sinkCounter.incrementBatchCompleteCount();            }            sinkCounter.addToEventDrainAttemptCount(count);            ClickHouseStatement sth = conn.createStatement();            sth.write().table(String.format(" %s.%s", database, table)).data(new ByteArrayInputStream(JsonUtil.t2JsonString(insertData).getBytes()), ClickHouseFormat.JSONEachRow).addDbParam(ClickHouseQueryParam.MAX_PARALLEL_REPLICAS, MAX_PARALLEL_REPLICAS_VALUE).send();            sinkCounter.incrementEventDrainSuccessCount();            status = Status.READY;            txn.commit();        } catch (Throwable t) {            txn.rollback();            LOGGER.error(t.getMessage(), t);            status = Status.BACKOFF;            // re-throw all Errors            if (t instanceof Error) {                throw (Error) t;            }        } finally {            txn.close();        }        return status;    }    @Override    public void configure(Context context) {        if (sinkCounter == null) {            sinkCounter = new SinkCounter(getName());        }        Preconditions.checkArgument(context.getString(HOST) != null && context.getString(HOST).length() > 0, "ClickHouse host must be specified!");        this.host = context.getString(HOST);        if (!this.host.startsWith(CLICK_HOUSE_PREFIX)) {            this.host = CLICK_HOUSE_PREFIX + this.host;        }        Preconditions.checkArgument(context.getString(DATABASE) != null && context.getString(DATABASE).length() > 0, "ClickHouse database must be specified!");        this.database = context.getString(DATABASE);        Preconditions.checkArgument(context.getString(TABLE) != null && context.getString(TABLE).length() > 0, "ClickHouse table must be specified!");        this.table = context.getString(TABLE);        this.port = context.getString(PORT, DEFAULT_PORT);        this.user = context.getString(USER, DEFAULT_USER);        this.password = context.getString(PASSWORD, DEFAULT_PASSWORD);        this.batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);    }    @Override    public void start() {        LOGGER.info("clickHouse sink {} starting", getName());        String jdbcUrl = String.format("%s:%s/%s", this.host, this.port, this.database);        ClickHouseProperties properties = new ClickHouseProperties().withCredentials(this.user, this.password);        this.dataSource = new BalancedClickhouseDataSource(jdbcUrl, properties);        sinkCounter.start();        super.start();        LOGGER.info("clickHouse sink {} started", getName());    }    @Override    public void stop() {        LOGGER.info("clickHouse sink {} stopping", getName());        sinkCounter.incrementConnectionClosedCount();        sinkCounter.stop();        super.stop();        LOGGER.info("clickHouse sink {} stopped", getName());    }4)flume 对应配置:
# 指定Agent的组件名称  a1.sources = r1  a1.sinks = sink1  a1.channels = c1  a1.sources.r1.type=spooldira1.sources.r1.spoolDir=/home/spark/flume/data/loga1.sources.r1.channels=c1a1.sources.r1.fileHeader = falsea1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = timestamp   # 指定Flume sink  a1.sinks.sink1.type = com.zhy.frame.newsql.clickhouse.sink.sink.ClickHouseSinka1.sinks.sink1.host = localhosta1.sinks.sink1.port = 8123a1.sinks.sink1.database = defaulta1.sinks.sink1.table = sys_loga1.sinks.sink1.batchSize = 10000a1.sinks.sink1.user = defaulta1.sinks.sink1.password =     # 指定Flume channel  a1.channels.c1.type = memory  a1.channels.c1.capacity = 1000  a1.channels.c1.transactionCapacity = 100     # 绑定source和sink到channel上  a1.sources.r1.channels = c1  a1.sinks.sink1.channel = c1
5)系统logback配置
<?xml version="1.0" encoding="UTF-8"?><!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 --><!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true --><!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 --><!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 --><configuration scan="true" scanPeriod="7 seconds">    <contextName>logback</contextName>    <!--系统名称-->    <property name="sysName" value="frameSimple"/>    <!--日志大小 KB MB-->    <property name="logMaxFileSize" value="1KB"/>    <!--日志保留天数-->    <property name="logMaxHistory" value="7"/>    <property name="logging.path" value="sysLog"/>    <!-- 彩色日志 -->    <!-- 彩色日志依赖的渲染类 -->    <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/>    <conversionRule conversionWord="wex"                    converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/>    <conversionRule conversionWord="wEx"                    converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>    <!-- 彩色日志格式 -->    <property name="CONSOLE_LOG_PATTERN"              value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>    <!--输出到控制台-->    <appender name="CONSOLE_APPENDER" class="ch.qos.logback.core.ConsoleAppender">        <!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息-->        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">            <level>info</level>        </filter>        <encoder>            <Pattern>${CONSOLE_LOG_PATTERN}</Pattern>            <!-- 设置字符集 -->            <charset>UTF-8</charset>        </encoder>    </appender>    <!--输出到文件-->    <!-- 时间滚动输出 level为 DEBUG 日志 -->    <appender name="DEBUG_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">        <!-- 正在记录的日志文件的路径及文件名 -->        <file>${logging.path}/debug_${sysName}.log</file>        <!--日志文件输出格式-->        <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">            <jsonGeneratorDecorator                    class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/>            <providers>                <pattern>                    <pattern>                        {                        "sysName":"${sysName}",                        "thread":"%thread",                        "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",                        "level":"%level",                        "msg": "%msg"                        }                    </pattern>                </pattern>            </providers>        </encoder>        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">            <!-- 日志归档 -->            <fileNamePattern>${logging.path}/debug/debug_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern>            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">                <maxFileSize>${logMaxFileSize}</maxFileSize>            </timeBasedFileNamingAndTriggeringPolicy>            <!--日志文件保留天数-->            <maxHistory>${logMaxHistory}</maxHistory>        </rollingPolicy>        <!-- 此日志文件只记录debug级别的 -->        <filter class="ch.qos.logback.classic.filter.LevelFilter">            <level>DEBUG</level>            <onMatch>ACCEPT</onMatch>            <onMismatch>DENY</onMismatch>        </filter>    </appender>    <!-- 时间滚动输出 level为 INFO 日志 -->    <appender name="INFO_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">        <!-- 正在记录的日志文件的路径及文件名 -->        <file>${logging.path}/info_${sysName}.log</file>        <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">            <jsonGeneratorDecorator                    class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/>            <providers>                <pattern>                    <pattern>                        {                        "sysName":"${sysName}",                        "thread":"%thread",                        "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",                        "level":"%level",                        "msg": "%msg"                        }                    </pattern>                </pattern>            </providers>        </encoder>        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">            <!-- 每天日志归档路径以及格式 -->            <fileNamePattern>${logging.path}/info/info_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern>            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">                <maxFileSize>${logMaxFileSize}</maxFileSize>            </timeBasedFileNamingAndTriggeringPolicy>            <!--日志文件保留天数-->            <maxHistory>${logMaxHistory}</maxHistory>        </rollingPolicy>        <!-- 此日志文件只记录info级别的 -->        <filter class="ch.qos.logback.classic.filter.LevelFilter">            <level>INFO</level>            <onMatch>ACCEPT</onMatch>            <onMismatch>DENY</onMismatch>        </filter>    </appender>    <!-- 时间滚动输出 level为 WARN 日志 -->    <appender name="WARN_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">        <!-- 正在记录的日志文件的路径及文件名 -->        <file>${logging.path}/warn_${sysName}.log</file>        <!--日志文件输出格式-->        <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">            <jsonGeneratorDecorator                    class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/>            <providers>                <pattern>                    <pattern>                        {                        "sysName":"${sysName}",                        "thread":"%thread",                        "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",                        "level":"%level",                        "msg": "%msg"                        }                    </pattern>                </pattern>            </providers>        </encoder>        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">            <fileNamePattern>${logging.path}/warn/warn_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern>            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">                <maxFileSize>${logMaxFileSize}</maxFileSize>            </timeBasedFileNamingAndTriggeringPolicy>            <!--日志文件保留天数-->            <maxHistory>${logMaxHistory}</maxHistory>        </rollingPolicy>        <!-- 此日志文件只记录warn级别的 -->        <filter class="ch.qos.logback.classic.filter.LevelFilter">            <level>WARN</level>            <onMatch>ACCEPT</onMatch>            <onMismatch>DENY</onMismatch>        </filter>    </appender>    <!-- 时间滚动输出 level为 ERROR 日志 -->    <appender name="ERROR_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">        <!-- 正在记录的日志文件的路径及文件名 -->        <file>${logging.path}/error_${sysName}.log</file>        <!--日志文件输出格式-->        <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">            <jsonGeneratorDecorator                    class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/>            <providers>                <pattern>                    <pattern>                        {                        "sysName":"${sysName}",                        "thread":"%thread",                        "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",                        "level":"%level",                        "msg": "%msg"                        }                    </pattern>                </pattern>            </providers>        </encoder>        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">            <fileNamePattern>${logging.path}/error/error_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern>            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">                <maxFileSize>${logMaxFileSize}</maxFileSize>            </timeBasedFileNamingAndTriggeringPolicy>            <!--日志文件保留天数-->            <maxHistory>${logMaxHistory}</maxHistory>        </rollingPolicy>        <!-- 此日志文件只记录ERROR级别的 -->        <filter class="ch.qos.logback.classic.filter.LevelFilter">            <level>ERROR</level>            <onMatch>ACCEPT</onMatch>            <onMismatch>DENY</onMismatch>        </filter>    </appender>    <root level="INFO">        <appender-ref ref="CONSOLE_APPENDER"/>        <appender-ref ref="DEBUG_APPENDER"/>        <appender-ref ref="INFO_APPENDER"/>        <appender-ref ref="WARN_APPENDER"/>        <appender-ref ref="ERROR_APPENDER"/>    </root></configuration>
综上,系统日志首先会在info.log,error.log,warn.log文件中,会根据文件大小滚动的生成到目录info,error,warn目录下。所以我们只需要监听info,error,warn目录就行
版权声明: 本文为 InfoQ 作者【柠檬】的原创文章。
原文链接:【http://xie.infoq.cn/article/6c71ebe26b990283ef228765b】。
本文遵守【CC BY-NC】协议,转载请保留原文出处及本版权声明。
 
 柠檬
人生尚未成功,朋友仍需努力 2020.05.21 加入
长期从事微服务,中台等后台开发和架构设计。一些见解和实现可查看https://gitee.com/lvmoney/zhy-frame-parent
 
  
  
  
  
  
  
  
  
    
评论