k8s 上运行我们的 springboot 服务之——flume 同步数据到到 clickHouse

用户头像
柠檬
关注
发布于: 2020 年 07 月 02 日
k8s 上运行我们的 springboot 服务之——flume同步数据到到clickHouse



clickHouse的简单介绍,详细介绍请查看官网或者百度



1)clickhouse非hadoop体系

2)使用sql语句,对于熟悉关系数据的人员入门相对简单

3)clickhouse最好用来读,不要用来变更,写用批量的方式

4)各种日志数据我们可以用flume同步到clickhouse来统一管理和做用户行为分析

5)mysql 增量同步到clickhouse,这里有一个思考:系统日志,交易日志,用户行为日志,已生成订单等不变的数据似乎可以同步到clickhouse来做报表、统计、数据分析等。

由于用户经常查询和操作一般都是最近的或者最新的数据,可以把这部分变更的有事务要求的数据放到mysql中。把mysql数据同步到clickhouse,近期最新和变更的数据在mysql中操作,其他大部分数据在clickhouse中操作,这样来减轻关系型数据库的性能瓶颈。

6)面对错综复杂的数据源我们似乎可以使用flink来把数据统一归集到clickhouse



以上都需要根据实际情况去测试使用。毕竟实践是检验真理的唯一标准



flume的简单介绍,详细介绍请查看官网或者百度

1)高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统

2)支持监听多种方式多种类型的文件或者文件目录数据的变更,以获得变更的数据并把这部分数据推送到不同的数据接收中间件

3)flume提供了多种插件来完成2中的需求,常用的例如:监听TCP的端口做为数据源,监听目录下日志文件的变更等

4)flume可以把变更的数据同步队列中,然后队列把数据分发到我们各种数据仓库中间件中,也可以不通过队列直接把数据同步存储到数据仓库中间件中

5)我们也可以自定义flume的ng来满足我们自己特殊的数据变更同步需求



本文主要讲解内容如下:

1、自定义实现flume ng

2、测试我们系统生成的日志通过1中实现的flume ng同步到 clickhouse



下面是核心代码:



1)在clickhouse中创建表:

CREATE TABLE default.sys_log (
`id` String,
`sys_name` String,
`level` String,
`msg` String,
`thread` String,
`create_date` DateTime,
`exe_date` DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_date)
ORDER BY exe_date;



2)自定义flume sink pom.xml

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-configuration -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
</dependency>

<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>4.2</version>
</dependency>

<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

</dependencies>



3) flume 自定义sink 核心类

private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSink.class);
private BalancedClickhouseDataSource dataSource = null;
private SinkCounter sinkCounter = null;
private String host = null;
private String port = null;
private String user = null;
private String password = null;
private String database = null;
private String table = null;
private int batchSize;

@Override
public Status process() throws EventDeliveryException {
Status status = null;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
List<LogLogbackVo> insertData = new ArrayList<>();
try {
ClickHouseConnectionImpl conn = (ClickHouseConnectionImpl) dataSource.getConnection();
int count;
for (count = 0; count < batchSize; ++count) {
Event event = ch.take();
if (event == null) {
break;
}
insertData.add(StringUtil.buildLog(new String(event.getBody())));
}
if (count <= 0) {
sinkCounter.incrementBatchEmptyCount();
txn.commit();
return Status.BACKOFF;
} else if (count < batchSize) {
sinkCounter.incrementBatchUnderflowCount();
} else {
sinkCounter.incrementBatchCompleteCount();
}
sinkCounter.addToEventDrainAttemptCount(count);
ClickHouseStatement sth = conn.createStatement();
sth.write().table(String.format(" %s.%s", database, table)).data(new ByteArrayInputStream(JsonUtil.t2JsonString(insertData).getBytes()), ClickHouseFormat.JSONEachRow).addDbParam(ClickHouseQueryParam.MAX_PARALLEL_REPLICAS, MAX_PARALLEL_REPLICAS_VALUE).send();
sinkCounter.incrementEventDrainSuccessCount();
status = Status.READY;
txn.commit();

} catch (Throwable t) {
txn.rollback();
LOGGER.error(t.getMessage(), t);
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error) t;
}
} finally {
txn.close();
}
return status;
}

@Override
public void configure(Context context) {
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}

Preconditions.checkArgument(context.getString(HOST) != null && context.getString(HOST).length() > 0, "ClickHouse host must be specified!");
this.host = context.getString(HOST);
if (!this.host.startsWith(CLICK_HOUSE_PREFIX)) {
this.host = CLICK_HOUSE_PREFIX + this.host;
}

Preconditions.checkArgument(context.getString(DATABASE) != null && context.getString(DATABASE).length() > 0, "ClickHouse database must be specified!");
this.database = context.getString(DATABASE);
Preconditions.checkArgument(context.getString(TABLE) != null && context.getString(TABLE).length() > 0, "ClickHouse table must be specified!");
this.table = context.getString(TABLE);
this.port = context.getString(PORT, DEFAULT_PORT);
this.user = context.getString(USER, DEFAULT_USER);
this.password = context.getString(PASSWORD, DEFAULT_PASSWORD);
this.batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
}


@Override
public void start() {
LOGGER.info("clickHouse sink {} starting", getName());
String jdbcUrl = String.format("%s:%s/%s", this.host, this.port, this.database);
ClickHouseProperties properties = new ClickHouseProperties().withCredentials(this.user, this.password);
this.dataSource = new BalancedClickhouseDataSource(jdbcUrl, properties);
sinkCounter.start();
super.start();
LOGGER.info("clickHouse sink {} started", getName());
}


@Override
public void stop() {
LOGGER.info("clickHouse sink {} stopping", getName());
sinkCounter.incrementConnectionClosedCount();
sinkCounter.stop();
super.stop();
LOGGER.info("clickHouse sink {} stopped", getName());
}

4)flume 对应配置:

# 指定Agent的组件名称
a1.sources = r1
a1.sinks = sink1
a1.channels = c1

a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/spark/flume/data/log
a1.sources.r1.channels=c1
a1.sources.r1.fileHeader = false
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
# 指定Flume sink
a1.sinks.sink1.type = com.zhy.frame.newsql.clickhouse.sink.sink.ClickHouseSink
a1.sinks.sink1.host = localhost
a1.sinks.sink1.port = 8123
a1.sinks.sink1.database = default
a1.sinks.sink1.table = sys_log
a1.sinks.sink1.batchSize = 10000
a1.sinks.sink1.user = default
a1.sinks.sink1.password =
# 指定Flume channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 绑定source和sink到channel上
a1.sources.r1.channels = c1
a1.sinks.sink1.channel = c1



5)系统logback配置

<?xml version="1.0" encoding="UTF-8"?>
<!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->
<!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true -->
<!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 -->
<!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 -->
<configuration scan="true" scanPeriod="7 seconds">
<contextName>logback</contextName>

<!--系统名称-->
<property name="sysName" value="frameSimple"/>
<!--日志大小 KB MB-->
<property name="logMaxFileSize" value="1KB"/>
<!--日志保留天数-->
<property name="logMaxHistory" value="7"/>

<property name="logging.path" value="sysLog"/>

<!-- 彩色日志 -->
<!-- 彩色日志依赖的渲染类 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/>
<conversionRule conversionWord="wex"
converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/>
<conversionRule conversionWord="wEx"
converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN"
value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>

<!--输出到控制台-->
<appender name="CONSOLE_APPENDER" class="ch.qos.logback.core.ConsoleAppender">
<!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息-->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>info</level>
</filter>
<encoder>
<Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
<!-- 设置字符集 -->
<charset>UTF-8</charset>
</encoder>
</appender>


<!--输出到文件-->
<!-- 时间滚动输出 level为 DEBUG 日志 -->
<appender name="DEBUG_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${logging.path}/debug_${sysName}.log</file>
<!--日志文件输出格式-->
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<jsonGeneratorDecorator
class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/>
<providers>
<pattern>
<pattern>
{
"sysName":"${sysName}",
"thread":"%thread",
"exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",
"level":"%level",

"msg": "%msg"
}
</pattern>
</pattern>
</providers>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志归档 -->
<fileNamePattern>${logging.path}/debug/debug_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${logMaxFileSize}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>${logMaxHistory}</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录debug级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>DEBUG</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>

<!-- 时间滚动输出 level为 INFO 日志 -->
<appender name="INFO_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${logging.path}/info_${sysName}.log</file>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<jsonGeneratorDecorator
class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/>
<providers>
<pattern>
<pattern>
{
"sysName":"${sysName}",
"thread":"%thread",
"exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",
"level":"%level",

"msg": "%msg"
}
</pattern>
</pattern>
</providers>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 每天日志归档路径以及格式 -->
<fileNamePattern>${logging.path}/info/info_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${logMaxFileSize}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>${logMaxHistory}</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录info级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>

<!-- 时间滚动输出 level为 WARN 日志 -->
<appender name="WARN_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${logging.path}/warn_${sysName}.log</file>
<!--日志文件输出格式-->
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<jsonGeneratorDecorator
class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/>
<providers>
<pattern>
<pattern>
{
"sysName":"${sysName}",
"thread":"%thread",
"exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",
"level":"%level",

"msg": "%msg"
}
</pattern>
</pattern>
</providers>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${logging.path}/warn/warn_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${logMaxFileSize}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>${logMaxHistory}</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录warn级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>WARN</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>


<!-- 时间滚动输出 level为 ERROR 日志 -->
<appender name="ERROR_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${logging.path}/error_${sysName}.log</file>
<!--日志文件输出格式-->
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<jsonGeneratorDecorator
class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/>
<providers>
<pattern>
<pattern>
{
"sysName":"${sysName}",
"thread":"%thread",
"exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",
"level":"%level",

"msg": "%msg"
}
</pattern>
</pattern>
</providers>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${logging.path}/error/error_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${logMaxFileSize}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>${logMaxHistory}</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录ERROR级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE_APPENDER"/>
<appender-ref ref="DEBUG_APPENDER"/>
<appender-ref ref="INFO_APPENDER"/>
<appender-ref ref="WARN_APPENDER"/>
<appender-ref ref="ERROR_APPENDER"/>
</root>

</configuration>



综上,系统日志首先会在info.log,error.log,warn.log文件中,会根据文件大小滚动的生成到目录info,error,warn目录下。所以我们只需要监听info,error,warn目录就行



具体实现,请查看https://gitee.com/lvmoney/zhy-frame-parent

发布于: 2020 年 07 月 02 日 阅读数: 75
用户头像

柠檬

关注

人生尚未成功,朋友仍需努力 2020.05.21 加入

长期从事微服务,中台等后台开发和架构设计。一些见解和实现可查看https://gitee.com/lvmoney/zhy-frame-parent

评论

发布
暂无评论
k8s 上运行我们的 springboot 服务之——flume同步数据到到clickHouse