写点什么

将 CSV 的数据发送到 kafka(java 版)

  • 2021 年 11 月 11 日
  • 本文字数:3019 字

    阅读完需:约 10 分钟

| 用户 ID | 整数类型,序列化后的用户 ID |


| 商品 ID | 整数类型,序列化后的商品 ID |


| 商品类目 ID | 整数类型,序列化后的商品所属类目 ID |


| 行为类型 | 字符串,枚举类型,包括(‘pv’, ‘buy’, ‘cart’, ‘fav’) |


| 时间戳 | 行为发生的时间戳 |


| 时间字符串 | 根据时间戳字段生成的时间字符串 |


  1. 关于该数据集的详情,请参考《准备数据集用于flink学习》

Java 应用简介

编码前,先把具体内容列出来,然后再挨个实现:


  1. 从 CSV 读取记录的工具类:UserBehaviorCsvFileReader

  2. 每条记录对应的 Bean 类:UserBehavior

  3. Java 对象序列化成 JSON 的序列化类:JsonSerializer

  4. 向 kafka 发送消息的工具类:KafkaProducer

  5. 应用类,程序入口:SendMessageApplication


上述五个类即可完成 Java 应用的工作,接下来开始编码吧;

直接下载源码

  1. 如果您不想写代码,您可以直接从 GitHub 下载这个工程的源码,地址和链接信息如下表所示:


| 名称 | 链接 | 备注 |


| :-- | :-- | :-- |


| 项目主页 | https://github.com/zq2599/blog_demos | 该项目在 GitHub 上的主页 |


| git 仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https 协议 |


| git 仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh 协议 |


  1. 这个 git 项目中有多个文件夹,本章源码在 flinksql 这个文件夹下,如下图红框所示:


编码

  1. 创建 maven 工程,pom.xml 如下,比较重要的 jackson 和 javacsv 的依赖:


<?xml version="1.0" encoding="UTF-8"?>


<project xmlns="http://maven.apache.org/POM/4.0.0"


xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"


xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">


<modelVersion>4.0.0</modelVersion>


<groupId>com.bolingcavalry</groupId>


<artifactId>flinksql</artifactId>


<version>1.0-SNAPSHOT</version>


<properties>


<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>


<flink.version>1.10.0</flink.version>


<kafka.version>2.2.0</kafka.version>


<java.version>1.8</java.version>


<scala.binary.version>2.11</scala.binary.version>


<maven.compiler.source>${java.version}</maven.compiler.source>


<maven.compiler.target>${java.version}</maven.compiler.target>


</properties>


<dependencies>


<dependency>


<groupId>org.apache.kafka</groupId>


<artifactId>kafka-clients</artifactId>


<version>${kafka.version}</version>


</dependency>


<dependency>


<groupId>com.fasterxml.jackson.core</groupId>


<artifactId>jackson-databind</artifactId>


<version>2.9.10.1</version>


</dependency>


<dependency>


<groupId>org.slf4j</groupId>


<artifactId>slf4j-log4j12</artifactId>


<version>1.7.7</version>


<scope>runtime</scope>


</dependency>


<dependency>


<groupId>log4j</groupId>


<artifactId>log4j</artifactId>


<version>1.2.17</version>


<scope>runtime</scope>


</dependency>


<dependency>


<groupId>net.sourceforge.javacsv</groupId>


<artifactId>javacsv</artifactId>


<version>2.0</version>


</dependency>


</dependencies>


<build>


<plugins>


<plugin>


<groupId>org.apache.maven.plugins</groupId>


<artifactId>maven-compiler-plugin</artifactId>


<version>3.1</version>


<configuration>


<source>${java.version}</source>


<target>${java.version}</target>


</configuration>


</plugin>


<plugin>


<groupId>org.apache.maven.plugins</groupId>


<artifactId>maven-shade-plugin</artifactId>


<version>3.0.0</version>


<executions>


<execution>


<phase>package</phase>


<goals>


<goal>shade</goal>


</goals>


<configuration>


<artifactSet>


<excludes>


</excludes>


</artifactSet>


<filters>


<filter>


<!-- Do not copy the signatures in the META-INF folder.


Otherwise, this might cause SecurityExceptions when using the JAR. -->


<artifact>:</artifact>


<excludes>


<exclude>META-INF/*.SF</exclude>


<exclude>META-INF/*.DSA</exclude>


<exclude>META-INF/*.RSA</exclude>


</excludes>


</filter>


</filters>


</configuration>


</execution>


</executions>


</plugin>


</plugins>


</build>


</project>


  1. 从 CSV 读取记录的工具类:UserBehaviorCsvFileReader,后面在主程序中会用到 java8 的 Steam API 来处理集合,所以 UserBehaviorCsvFileReader 实现了 Supplier 接口:


public class UserBehaviorCsvFileReader implements Supplier<UserBehavior> {


private final String filePath;


private CsvReader csvReader;


public UserBehaviorCsvFileReader(String filePath) throws IOException {


this.filePath = filePath;


try {


csvReader = new CsvReader(filePath);


csvReader.readHeaders();


} catch (IOException e) {


throw new IOException("Error reading TaxiRecords from file: " + filePath, e);


}


}


@Override


public UserBehavior get() {


UserBehavior userBehavior = null;


try{


if(csvReader.readRecord()) {


csvReader.getRawRecord();


userBehavior = new UserBehavior(


Long.valueOf(csvReader.get(0)),


Long.valueOf(csvReader.get(1)),


Long.valueOf(csvReader.get(2)),


csvReader.get(3),


new Date(Long.valueOf(csvReader.get(4))*1000L));


}


} catch (IOException e) {


throw new NoSuchElementException("IOException from " + filePath);


}


if (null==userBehavior) {


throw new NoSuchElementException("All records read from " + filePath);


}


return userBehavior;


}


}


  1. 每条记录对应的 Bean 类:UserBehavior,和 CSV 记录格式保持一致即可,表示时间的 ts 字段,使用了 JsonFormat 注解,在序列化的时候以此来控制格式:


public class UserBehavior {


@JsonFormat


private long user_id;


@JsonFormat


private long item_id;


@JsonFormat


private long category_id;


@JsonFormat


private String behavior;


@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss'Z'")


private Date ts;


public UserBehavior() {


}


public UserBehavior(long user_id, long item_id, long category_id, String behavior, Date ts) {


this.user_id = user_id;


this.item_id = item_id;


this.category_id = category_id;


this.behavior = behavior;


this.ts = ts;


}


}


  1. Java 对象序列化成 JSON 的序列化类:JsonSerializer


public class JsonSerializer<T> {


private final ObjectMapper jsonMapper = new ObjectMapper();


public String toJSONString(T r) {


try {


return jsonMapper.writeValueAsString(r);


} catch (JsonProcessingException e) {


throw new IllegalArgumentException("Could not serialize record: " + r, e);


}


}


public byte[] toJSONBytes(T r) {


try {


return jsonMapper.writeValueAsBytes(r);


} catch (JsonProcessingException e) {


throw new IllegalArgumentException("Could not serialize record: " + r, e);


}


}


}


  1. 向 kafka 发送消息的工具类:KafkaProducer:


public class KafkaProducer implements Consumer<UserBehavior> {


private final String topic;


private final org.apache.kafka.clients.producer.KafkaProducer<byte[], byte[]> producer;


private final JsonSerializer<UserBehavior> serializer;


public KafkaProducer(String kafkaTopic, String kafkaBrokers) {

评论

发布
暂无评论
将CSV的数据发送到kafka(java版)