k8s 上运行我们的 springboot 服务之——flume 同步数据到到 clickHouse
clickHouse的简单介绍,详细介绍请查看官网或者百度
1)clickhouse非hadoop体系
2)使用sql语句,对于熟悉关系数据的人员入门相对简单
3)clickhouse最好用来读,不要用来变更,写用批量的方式
4)各种日志数据我们可以用flume同步到clickhouse来统一管理和做用户行为分析
5)mysql 增量同步到clickhouse,这里有一个思考:系统日志,交易日志,用户行为日志,已生成订单等不变的数据似乎可以同步到clickhouse来做报表、统计、数据分析等。
由于用户经常查询和操作一般都是最近的或者最新的数据,可以把这部分变更的有事务要求的数据放到mysql中。把mysql数据同步到clickhouse,近期最新和变更的数据在mysql中操作,其他大部分数据在clickhouse中操作,这样来减轻关系型数据库的性能瓶颈。
6)面对错综复杂的数据源我们似乎可以使用flink来把数据统一归集到clickhouse
以上都需要根据实际情况去测试使用。毕竟实践是检验真理的唯一标准
flume的简单介绍,详细介绍请查看官网或者百度
1)高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统
2)支持监听多种方式多种类型的文件或者文件目录数据的变更,以获得变更的数据并把这部分数据推送到不同的数据接收中间件
3)flume提供了多种插件来完成2中的需求,常用的例如:监听TCP的端口做为数据源,监听目录下日志文件的变更等
4)flume可以把变更的数据同步队列中,然后队列把数据分发到我们各种数据仓库中间件中,也可以不通过队列直接把数据同步存储到数据仓库中间件中
5)我们也可以自定义flume的ng来满足我们自己特殊的数据变更同步需求
本文主要讲解内容如下:
1、自定义实现flume ng
2、测试我们系统生成的日志通过1中实现的flume ng同步到 clickhouse
下面是核心代码:
1)在clickhouse中创建表:
CREATE TABLE default.sys_log ( `id` String, `sys_name` String, `level` String, `msg` String, `thread` String, `create_date` DateTime, `exe_date` DateTime) ENGINE = MergeTree()PARTITION BY toYYYYMM(create_date)ORDER BY exe_date;
2)自定义flume sink pom.xml
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-configuration --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> </dependency> <dependency> <groupId>com.opencsv</groupId> <artifactId>opencsv</artifactId> <version>4.2</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
3) flume 自定义sink 核心类
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSink.class); private BalancedClickhouseDataSource dataSource = null; private SinkCounter sinkCounter = null; private String host = null; private String port = null; private String user = null; private String password = null; private String database = null; private String table = null; private int batchSize; @Override public Status process() throws EventDeliveryException { Status status = null; Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); List<LogLogbackVo> insertData = new ArrayList<>(); try { ClickHouseConnectionImpl conn = (ClickHouseConnectionImpl) dataSource.getConnection(); int count; for (count = 0; count < batchSize; ++count) { Event event = ch.take(); if (event == null) { break; } insertData.add(StringUtil.buildLog(new String(event.getBody()))); } if (count <= 0) { sinkCounter.incrementBatchEmptyCount(); txn.commit(); return Status.BACKOFF; } else if (count < batchSize) { sinkCounter.incrementBatchUnderflowCount(); } else { sinkCounter.incrementBatchCompleteCount(); } sinkCounter.addToEventDrainAttemptCount(count); ClickHouseStatement sth = conn.createStatement(); sth.write().table(String.format(" %s.%s", database, table)).data(new ByteArrayInputStream(JsonUtil.t2JsonString(insertData).getBytes()), ClickHouseFormat.JSONEachRow).addDbParam(ClickHouseQueryParam.MAX_PARALLEL_REPLICAS, MAX_PARALLEL_REPLICAS_VALUE).send(); sinkCounter.incrementEventDrainSuccessCount(); status = Status.READY; txn.commit(); } catch (Throwable t) { txn.rollback(); LOGGER.error(t.getMessage(), t); status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error) t; } } finally { txn.close(); } return status; } @Override public void configure(Context context) { if (sinkCounter == null) { sinkCounter = new SinkCounter(getName()); } Preconditions.checkArgument(context.getString(HOST) != null && context.getString(HOST).length() > 0, "ClickHouse host must be specified!"); this.host = context.getString(HOST); if (!this.host.startsWith(CLICK_HOUSE_PREFIX)) { this.host = CLICK_HOUSE_PREFIX + this.host; } Preconditions.checkArgument(context.getString(DATABASE) != null && context.getString(DATABASE).length() > 0, "ClickHouse database must be specified!"); this.database = context.getString(DATABASE); Preconditions.checkArgument(context.getString(TABLE) != null && context.getString(TABLE).length() > 0, "ClickHouse table must be specified!"); this.table = context.getString(TABLE); this.port = context.getString(PORT, DEFAULT_PORT); this.user = context.getString(USER, DEFAULT_USER); this.password = context.getString(PASSWORD, DEFAULT_PASSWORD); this.batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE); } @Override public void start() { LOGGER.info("clickHouse sink {} starting", getName()); String jdbcUrl = String.format("%s:%s/%s", this.host, this.port, this.database); ClickHouseProperties properties = new ClickHouseProperties().withCredentials(this.user, this.password); this.dataSource = new BalancedClickhouseDataSource(jdbcUrl, properties); sinkCounter.start(); super.start(); LOGGER.info("clickHouse sink {} started", getName()); } @Override public void stop() { LOGGER.info("clickHouse sink {} stopping", getName()); sinkCounter.incrementConnectionClosedCount(); sinkCounter.stop(); super.stop(); LOGGER.info("clickHouse sink {} stopped", getName()); }
4)flume 对应配置:
# 指定Agent的组件名称 a1.sources = r1 a1.sinks = sink1 a1.channels = c1 a1.sources.r1.type=spooldira1.sources.r1.spoolDir=/home/spark/flume/data/loga1.sources.r1.channels=c1a1.sources.r1.fileHeader = falsea1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = timestamp # 指定Flume sink a1.sinks.sink1.type = com.zhy.frame.newsql.clickhouse.sink.sink.ClickHouseSinka1.sinks.sink1.host = localhosta1.sinks.sink1.port = 8123a1.sinks.sink1.database = defaulta1.sinks.sink1.table = sys_loga1.sinks.sink1.batchSize = 10000a1.sinks.sink1.user = defaulta1.sinks.sink1.password = # 指定Flume channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定source和sink到channel上 a1.sources.r1.channels = c1 a1.sinks.sink1.channel = c1
5)系统logback配置
<?xml version="1.0" encoding="UTF-8"?><!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 --><!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true --><!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 --><!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 --><configuration scan="true" scanPeriod="7 seconds"> <contextName>logback</contextName> <!--系统名称--> <property name="sysName" value="frameSimple"/> <!--日志大小 KB MB--> <property name="logMaxFileSize" value="1KB"/> <!--日志保留天数--> <property name="logMaxHistory" value="7"/> <property name="logging.path" value="sysLog"/> <!-- 彩色日志 --> <!-- 彩色日志依赖的渲染类 --> <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/> <conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/> <conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/> <!-- 彩色日志格式 --> <property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/> <!--输出到控制台--> <appender name="CONSOLE_APPENDER" class="ch.qos.logback.core.ConsoleAppender"> <!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息--> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>info</level> </filter> <encoder> <Pattern>${CONSOLE_LOG_PATTERN}</Pattern> <!-- 设置字符集 --> <charset>UTF-8</charset> </encoder> </appender> <!--输出到文件--> <!-- 时间滚动输出 level为 DEBUG 日志 --> <appender name="DEBUG_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- 正在记录的日志文件的路径及文件名 --> <file>${logging.path}/debug_${sysName}.log</file> <!--日志文件输出格式--> <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"> <jsonGeneratorDecorator class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/> <providers> <pattern> <pattern> { "sysName":"${sysName}", "thread":"%thread", "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}", "level":"%level", "msg": "%msg" } </pattern> </pattern> </providers> </encoder> <!-- 日志记录器的滚动策略,按日期,按大小记录 --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- 日志归档 --> <fileNamePattern>${logging.path}/debug/debug_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>${logMaxFileSize}</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <!--日志文件保留天数--> <maxHistory>${logMaxHistory}</maxHistory> </rollingPolicy> <!-- 此日志文件只记录debug级别的 --> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>DEBUG</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> </appender> <!-- 时间滚动输出 level为 INFO 日志 --> <appender name="INFO_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- 正在记录的日志文件的路径及文件名 --> <file>${logging.path}/info_${sysName}.log</file> <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"> <jsonGeneratorDecorator class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/> <providers> <pattern> <pattern> { "sysName":"${sysName}", "thread":"%thread", "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}", "level":"%level", "msg": "%msg" } </pattern> </pattern> </providers> </encoder> <!-- 日志记录器的滚动策略,按日期,按大小记录 --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- 每天日志归档路径以及格式 --> <fileNamePattern>${logging.path}/info/info_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>${logMaxFileSize}</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <!--日志文件保留天数--> <maxHistory>${logMaxHistory}</maxHistory> </rollingPolicy> <!-- 此日志文件只记录info级别的 --> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>INFO</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> </appender> <!-- 时间滚动输出 level为 WARN 日志 --> <appender name="WARN_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- 正在记录的日志文件的路径及文件名 --> <file>${logging.path}/warn_${sysName}.log</file> <!--日志文件输出格式--> <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"> <jsonGeneratorDecorator class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/> <providers> <pattern> <pattern> { "sysName":"${sysName}", "thread":"%thread", "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}", "level":"%level", "msg": "%msg" } </pattern> </pattern> </providers> </encoder> <!-- 日志记录器的滚动策略,按日期,按大小记录 --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${logging.path}/warn/warn_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>${logMaxFileSize}</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <!--日志文件保留天数--> <maxHistory>${logMaxHistory}</maxHistory> </rollingPolicy> <!-- 此日志文件只记录warn级别的 --> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>WARN</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> </appender> <!-- 时间滚动输出 level为 ERROR 日志 --> <appender name="ERROR_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- 正在记录的日志文件的路径及文件名 --> <file>${logging.path}/error_${sysName}.log</file> <!--日志文件输出格式--> <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"> <jsonGeneratorDecorator class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/> <providers> <pattern> <pattern> { "sysName":"${sysName}", "thread":"%thread", "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}", "level":"%level", "msg": "%msg" } </pattern> </pattern> </providers> </encoder> <!-- 日志记录器的滚动策略,按日期,按大小记录 --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${logging.path}/error/error_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>${logMaxFileSize}</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <!--日志文件保留天数--> <maxHistory>${logMaxHistory}</maxHistory> </rollingPolicy> <!-- 此日志文件只记录ERROR级别的 --> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>ERROR</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> </appender> <root level="INFO"> <appender-ref ref="CONSOLE_APPENDER"/> <appender-ref ref="DEBUG_APPENDER"/> <appender-ref ref="INFO_APPENDER"/> <appender-ref ref="WARN_APPENDER"/> <appender-ref ref="ERROR_APPENDER"/> </root></configuration>
综上,系统日志首先会在info.log,error.log,warn.log文件中,会根据文件大小滚动的生成到目录info,error,warn目录下。所以我们只需要监听info,error,warn目录就行
版权声明: 本文为 InfoQ 作者【柠檬】的原创文章。
原文链接:【http://xie.infoq.cn/article/6c71ebe26b990283ef228765b】。
本文遵守【CC BY-NC】协议,转载请保留原文出处及本版权声明。
柠檬
人生尚未成功,朋友仍需努力 2020.05.21 加入
长期从事微服务,中台等后台开发和架构设计。一些见解和实现可查看https://gitee.com/lvmoney/zhy-frame-parent
评论