写点什么

Flink 自定义 Avro 序列化 (Source/Sink) 到 kafka 中

发布于: 2021 年 01 月 09 日
Flink 自定义Avro序列化(Source/Sink)到kafka中

前言

最近一直在研究如果提高 kafka 中读取效率,之前一直使用字符串的方式将数据写入到 kafka 中。当数据将特别大的时候发现效率不是很好,偶然之间接触到了Avro序列化,发现 kafka 也是支持 Avro 的方式于是就有了本篇文章。

环境所依赖的 pom 文件

 <dependencies>        <dependency>            <groupId>org.apache.avro</groupId>            <artifactId>avro</artifactId>            <version>1.8.2</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-scala_2.12</artifactId>            <version>1.10.1</version>        </dependency>        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-streaming-scala_2.12</artifactId>            <version>1.10.1</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-connector-kafka-0.11_2.12</artifactId>            <version>1.10.1</version>        </dependency>        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-avro</artifactId>            <version>1.10.1</version>        </dependency>        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->        <dependency>            <groupId>org.apache.kafka</groupId>            <artifactId>kafka-clients</artifactId>            <version>1.0.0</version>        </dependency>        <dependency>            <groupId>org.apache.kafka</groupId>            <artifactId>kafka-streams</artifactId>            <version>1.0.0</version>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.apache.avro</groupId>                <artifactId>avro-maven-plugin</artifactId>                <version>1.8.2</version>                <executions>                    <execution>                        <phase>generate-sources</phase>                        <goals>                            <goal>schema</goal>                        </goals>                        <configuration>                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>                        </configuration>                    </execution>                </executions>            </plugin>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <configuration>                    <source>1.6</source>                    <target>1.6</target>                </configuration>            </plugin>        </plugins>    </build>
复制代码

一、Avro 提供的技术支持包括以下五个方面:

  • 优秀的数据结构;

  • 一个紧凑的,快速的,二进制数据格式;

  • 一个容器文件,用来存储持久化数据;

  • RPC 远程过程调用;

  • 集成最简单的动态语言。读取或者写入数据文件,使用或实现 RPC 协议均不需要代码实现。对于静态- - 语言编写的话需要实现;

二、Avro 优点

  • 二进制消息,性能好/效率高

  • 使用 JSON 描述模式

  • 模式和数据统一存储,消息自描述,不需要生成 stub 代码(支持生成 IDL)

  • RPC 调用在握手阶段交换模式定义

  • 包含完整的客户端/服务端堆栈,可快速实现 RPC

  • 支持同步和异步通信

  • 支持动态消息

  • 模式定义允许定义数据的排序(序列化时会遵循这个顺序)

  • 提供了基于 Jetty 内核的服务基于 Netty 的服务


三、Avro Json 格式介绍

{    "namespace": "com.avro.bean",    "type": "record",    "name": "UserBehavior",    "fields": [        {"name": "userId", "type": "long"},        {"name": "itemId",  "type": "long"},        {"name": "categoryId", "type": "int"},        {"name": "behavior", "type": "string"},        {"name": "timestamp", "type": "long"}    ]}
复制代码
  • namespace : 要生成的目录

  • type : 类型 avro 使用 record

  • name : 会自动生成对应的对象

  • fields : 要指定的字段


注意: 创建的文件后缀名一定要叫 avsc


我们使用 idea 生成 UserBehavior 对象

四、使用 Java 自定义序列化到 kafka

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 首先我们先使用 Java 编写 Kafka 客户端写入数据和消费数据。

4.1 准备测试数据

543462,1715,1464116,pv,1511658000

662867,2244074,1575622,pv,1511658000

561558,3611281,965809,pv,1511658000

894923,3076029,1879194,pv,1511658000

834377,4541270,3738615,pv,1511658000

315321,942195,4339722,pv,1511658000

625915,1162383,570735,pv,1511658000

4.2 自定义 Avro 序列化和反序列化

首先我们需要实现 2 个类分别为SerializerDeserializer分别是序列化和反序列化

package com.avro.AvroUtil;
import com.avro.bean.UserBehavior;import org.apache.avro.io.BinaryDecoder;import org.apache.avro.io.BinaryEncoder;import org.apache.avro.io.DecoderFactory;import org.apache.avro.io.EncoderFactory;import org.apache.avro.specific.SpecificDatumReader;import org.apache.avro.specific.SpecificDatumWriter;import org.apache.kafka.common.serialization.Deserializer;import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.util.Map;
/** * @author 大数据老哥 * @version V1.0 * @Package com.avro.AvroUtil * @File :SimpleAvroSchemaJava.java * @date 2021/1/8 20:02 *//** * 自定义序列化和反序列化 */public class SimpleAvroSchemaJava implements Serializer<UserBehavior>, Deserializer<UserBehavior> { @Override public void configure(Map<String, ?> map, boolean b) {
} //序列化方法 @Override public byte[] serialize(String s, UserBehavior userBehavior) { // 创建序列化执行器 SpecificDatumWriter<UserBehavior> writer = new SpecificDatumWriter<UserBehavior>(userBehavior.getSchema()); // 创建一个流 用存储序列化后的二进制文件 ByteArrayOutputStream out = new ByteArrayOutputStream(); // 创建二进制编码器 BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); try { // 数据入都流中 writer.write(userBehavior, encoder); } catch (IOException e) { e.printStackTrace(); }
return out.toByteArray(); }
@Override public void close() {
}
//反序列化 @Override public UserBehavior deserialize(String s, byte[] bytes) { // 用来保存结果数据 UserBehavior userBehavior = new UserBehavior(); // 创建输入流用来读取二进制文件 ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(bytes); // 创建输入序列化执行器 SpecificDatumReader<UserBehavior> stockSpecificDatumReader = new SpecificDatumReader<UserBehavior>(userBehavior.getSchema()); //创建二进制解码器 BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(arrayInputStream, null); try { // 数据读取 userBehavior=stockSpecificDatumReader.read(null, binaryDecoder); } catch (IOException e) { e.printStackTrace(); } // 结果返回 return userBehavior; }}
复制代码


4.3 创建序列化对象


package com.avro.kafka;import com.avro.bean.UserBehavior;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.io.BufferedReader;import java.io.FileReader;import java.util.ArrayList;import java.util.List;import java.util.Properties;
/** * @author 大数据老哥 * @version V1.0 * @Package com.avro.kafka * @File :UserBehaviorProducerKafka.java * @date 2021/1/8 20:14 */
public class UserBehaviorProducerKafka { public static void main(String[] args) throws InterruptedException { // 获取数据 List<UserBehavior> data = getData(); // 创建配置文件 Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "com.avro.AvroUtil.SimpleAvroSchemaJava"); // 创建kafka的生产者 KafkaProducer<String, UserBehavior> userBehaviorProducer = new KafkaProducer<String, UserBehavior>(props); // 循环遍历数据 for (UserBehavior userBehavior : data) { ProducerRecord<String, UserBehavior> producerRecord = new ProducerRecord<String, UserBehavior>("UserBehaviorKafka", userBehavior); userBehaviorProducer.send(producerRecord); System.out.println("数据写入成功"+data); Thread.sleep(1000); } }
public static List<UserBehavior> getData() { ArrayList<UserBehavior> userBehaviors = new ArrayList<UserBehavior>(); try { BufferedReader br = new BufferedReader(new FileReader(new File("data/UserBehavior.csv"))); String line = ""; while ((line = br.readLine()) != null) { String[] split = line.split(","); userBehaviors.add( new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]))); } } catch (Exception e) { e.printStackTrace(); } return userBehaviors; }}
复制代码

注意:value.serializer 一定要指定我们自己写好的那个反序列化类,负责会无效

4.4 创建反序列化对象

package com.avro.kafka;import com.avro.bean.UserBehavior;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;
/** * @author 大数据老哥 * @version V1.0 * @Package com.avro.kafka * @File :UserBehaviorConsumer.java * @date 2021/1/8 20:58 */public class UserBehaviorConsumer {
public static void main(String[] args) { Properties prop = new Properties(); prop.put("bootstrap.servers", "192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092"); prop.put("group.id", "UserBehavior"); prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 设置反序列化类为自定义的avro反序列化类 prop.put("value.deserializer", "com.avro.AvroUtil.SimpleAvroSchemaJava"); KafkaConsumer<String, UserBehavior> consumer = new KafkaConsumer<String, UserBehavior>(prop); consumer.subscribe(Arrays.asList("UserBehaviorKafka")); while (true) { ConsumerRecords<String, UserBehavior> poll = consumer.poll(1000); for (ConsumerRecord<String, UserBehavior> stringStockConsumerRecord : poll) { System.out.println(stringStockConsumerRecord.value()); } } }}
复制代码

4.5 启动运行

创建 kafkaTopic 和启动一个消费者

# 创建topic./kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic UserBehaviorKafka# 模拟消费者./kafka-console-consumer.sh --from-beginning --topic UserBehaviorKafka --zookeeper node01:2181,node02:2node03:2181
复制代码


五、Flink 实现 Avro 自定义序列化到 Kafka

到这里好多小伙们就说我 Java 实现了那 Flink 不就改一下 Consumer 和 Producer 不就完了吗?

5.1 准备数据

543462,1715,1464116,pv,1511658000

662867,2244074,1575622,pv,1511658000

561558,3611281,965809,pv,1511658000

894923,3076029,1879194,pv,1511658000

834377,4541270,3738615,pv,1511658000

315321,942195,4339722,pv,1511658000

625915,1162383,570735,pv,1511658000

5.2 创建 Flink 自定义 Avro 序列化和反序列化

当我们创建 FlinkKafka 连接器的时候发现使用 Java 那个类序列化发现不行,于是我们改为了系统自带的那个类进行测试。点击源码查看发系统自带的那个 String 其实实现的是DeserializationSchemaSerializationSchema,那我们是不是也可以模仿一个那?



package com.avro.AvroUtil;
import com.avro.bean.UserBehavior;import com.typesafe.sslconfig.ssl.FakeChainedKeyStore;import org.apache.avro.io.BinaryDecoder;import org.apache.avro.io.BinaryEncoder;import org.apache.avro.io.DecoderFactory;import org.apache.avro.io.EncoderFactory;import org.apache.avro.specific.SpecificDatumReader;import org.apache.avro.specific.SpecificDatumWriter;import org.apache.flink.api.common.serialization.DeserializationSchema;import org.apache.flink.api.common.serialization.SerializationSchema;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.kafka.common.serialization.Deserializer;import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.util.Map;
/** * @author 大数据老哥 * @version V1.0 * @Package com.avro.AvroUtil * @File :SimpleAvroSchemaFlink.java * @date 2021/1/8 20:02 */
/** * 自定义序列化和反序列化 */public class SimpleAvroSchemaFlink implements DeserializationSchema<UserBehavior>, SerializationSchema<UserBehavior> {
@Override public byte[] serialize(UserBehavior userBehavior) { // 创建序列化执行器 SpecificDatumWriter<UserBehavior> writer = new SpecificDatumWriter<UserBehavior>(userBehavior.getSchema()); // 创建一个流 用存储序列化后的二进制文件 ByteArrayOutputStream out = new ByteArrayOutputStream(); // 创建二进制编码器 BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); try { // 数据入都流中 writer.write(userBehavior, encoder); } catch (IOException e) { e.printStackTrace(); }
return out.toByteArray(); }
@Override public TypeInformation<UserBehavior> getProducedType() { return TypeInformation.of(UserBehavior.class); }
@Override public UserBehavior deserialize(byte[] bytes) throws IOException { // 用来保存结果数据 UserBehavior userBehavior = new UserBehavior(); // 创建输入流用来读取二进制文件 ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(bytes); // 创建输入序列化执行器 SpecificDatumReader<UserBehavior> stockSpecificDatumReader = new SpecificDatumReader<UserBehavior>(userBehavior.getSchema()); //创建二进制解码器 BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(arrayInputStream, null); try { // 数据读取 userBehavior=stockSpecificDatumReader.read(null, binaryDecoder); } catch (IOException e) { e.printStackTrace(); } // 结果返回 return userBehavior; }
@Override public boolean isEndOfStream(UserBehavior userBehavior) { return false; }}
复制代码

5.3 创建 Flink Comsumer 反序列化

package com.avro.FlinkKafka
import com.avro.AvroUtil.{SimpleAvroSchemaFlink}import com.avro.bean.UserBehaviorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import java.util.Properties
/** * @Package com.avro.FlinkKafka * @File :UserBehaviorConsumerFlink.java * @author 大数据老哥 * @date 2021/1/8 21:18 * @version V1.0 */object UserBehaviorConsumerFlink { def main(args: Array[String]): Unit = { //1.构建流处理运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置并行度1 方便后面测试 // 2.设置kafka 配置信息 val prop = new Properties prop.put("bootstrap.servers", "192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092") prop.put("group.id", "UserBehavior") prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // 设置反序列化类为自定义的avro反序列化类 prop.put("value.deserializer", "com.avro.AvroUtil.SimpleAvroSchemaFlink")
// val kafka: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("UserBehaviorKafka", new SimpleStringSchema(), prop) // 3.构建Kafka 连接器 val kafka: FlinkKafkaConsumer011[UserBehavior] = new FlinkKafkaConsumer011[UserBehavior]("UserBehavior", new SimpleAvroSchemaFlink(), prop)
//4.设置Flink层最新的数据开始消费 kafka.setStartFromLatest() //5.基于kafka构建数据源 val data: DataStream[UserBehavior] = env.addSource(kafka) //6.结果打印 data.print() env.execute("UserBehaviorConsumerFlink") }}
复制代码

5.4 创建 Flink Producer 序列化

package com.avro.FlinkKafka
import com.avro.AvroUtil.SimpleAvroSchemaFlinkimport com.avro.bean.UserBehaviorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import java.util.Properties
/** * @Package com.avro.FlinkKafka * @File :UserBehaviorProducerFlink.java * @author 大数据老哥 * @date 2021/1/8 21:38 * @version V1.0 */object UserBehaviorProducerFlink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val value = env.readTextFile("./data/UserBehavior.csv") val users: DataStream[UserBehavior] = value.map(row => { val arr = row.split(",") val behavior = new UserBehavior() behavior.setUserId(arr(0).toLong) behavior.setItemId(arr(1).toLong) behavior.setCategoryId(arr(2).toInt) behavior.setBehavior(arr(3)) behavior.setTimestamp(arr(4).toLong) behavior }) val prop = new Properties() prop.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092") //4.连接Kafka val producer: FlinkKafkaProducer011[UserBehavior] = new FlinkKafkaProducer011[UserBehavior]("UserBehaviorKafka", new SimpleAvroSchemaFlink(), prop) //5.将数据打入kafka users.addSink(producer) //6.执行任务 env.execute("UserBehaviorProducerFlink") }}
复制代码

5.5 启动运行

需要源码的请去 GitHub 自行下载 https://github.com/lhh2002/Flink_Avro


小结

其实我在实现这个功能的时候也是蒙的,不会难道就不学了吗,肯定不是呀。我在 5.2 提出的那个问题的时候其实是我自己亲身经历过的。首先遇到了问题不要想着怎么放弃,而是想想怎么解决,当时我的思路看源码看别人写的。最后经过不懈的努力也终成功了,我在这里为大家提供Flink面试题需要的朋友可以去下面 GitHub 去下载,信自己,努力和汗水总会能得到回报的。我是大数据老哥,我们下期见~~~

>资源获取 获取 Flink 面试题,Spark 面试题,程序员必备软件,hive 面试题,Hadoop 面试题,Docker 面试题,简历模板等资源请去

>GitHub 自行下载 https://github.com/lhh2002/Framework-Of-BigData

>Gitee 自行下载 https://gitee.com/liheyhey/dashboard/projects


发布于: 2021 年 01 月 09 日阅读数: 40
用户头像

微信搜公众号【大数据老哥】 2021.01.03 加入

微信搜索公众号【大数据老哥】 自己GitHub【https://github.com/lhh2002】 欢迎来star

评论

发布
暂无评论
Flink 自定义Avro序列化(Source/Sink)到kafka中