将 CSV 的数据发送到 kafka(java 版)
| 用户 ID | 整数类型,序列化后的用户 ID |
| 商品 ID | 整数类型,序列化后的商品 ID |
| 商品类目 ID | 整数类型,序列化后的商品所属类目 ID |
| 行为类型 | 字符串,枚举类型,包括(‘pv’, ‘buy’, ‘cart’, ‘fav’) |
| 时间戳 | 行为发生的时间戳 |
| 时间字符串 | 根据时间戳字段生成的时间字符串 |
关于该数据集的详情,请参考《准备数据集用于flink学习》
Java 应用简介
编码前,先把具体内容列出来,然后再挨个实现:
从 CSV 读取记录的工具类:UserBehaviorCsvFileReader
每条记录对应的 Bean 类:UserBehavior
Java 对象序列化成 JSON 的序列化类:JsonSerializer
向 kafka 发送消息的工具类:KafkaProducer
应用类,程序入口:SendMessageApplication
上述五个类即可完成 Java 应用的工作,接下来开始编码吧;
直接下载源码
如果您不想写代码,您可以直接从 GitHub 下载这个工程的源码,地址和链接信息如下表所示:
| 名称 | 链接 | 备注 |
| :-- | :-- | :-- |
| 项目主页 | https://github.com/zq2599/blog_demos | 该项目在 GitHub 上的主页 |
| git 仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https 协议 |
| git 仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh 协议 |
这个 git 项目中有多个文件夹,本章源码在 flinksql 这个文件夹下,如下图红框所示:
编码
创建 maven 工程,pom.xml 如下,比较重要的 jackson 和 javacsv 的依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bolingcavalry</groupId>
<artifactId>flinksql</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.0</flink.version>
<kafka.version>2.2.0</kafka.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.10.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>net.sourceforge.javacsv</groupId>
<artifactId>javacsv</artifactId>
<version>2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>:</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
从 CSV 读取记录的工具类:UserBehaviorCsvFileReader,后面在主程序中会用到 java8 的 Steam API 来处理集合,所以 UserBehaviorCsvFileReader 实现了 Supplier 接口:
public class UserBehaviorCsvFileReader implements Supplier<UserBehavior> {
private final String filePath;
private CsvReader csvReader;
public UserBehaviorCsvFileReader(String filePath) throws IOException {
this.filePath = filePath;
try {
csvReader = new CsvReader(filePath);
csvReader.readHeaders();
} catch (IOException e) {
throw new IOException("Error reading TaxiRecords from file: " + filePath, e);
}
}
@Override
public UserBehavior get() {
UserBehavior userBehavior = null;
try{
if(csvReader.readRecord()) {
csvReader.getRawRecord();
userBehavior = new UserBehavior(
Long.valueOf(csvReader.get(0)),
Long.valueOf(csvReader.get(1)),
Long.valueOf(csvReader.get(2)),
csvReader.get(3),
new Date(Long.valueOf(csvReader.get(4))*1000L));
}
} catch (IOException e) {
throw new NoSuchElementException("IOException from " + filePath);
}
if (null==userBehavior) {
throw new NoSuchElementException("All records read from " + filePath);
}
return userBehavior;
}
}
每条记录对应的 Bean 类:UserBehavior,和 CSV 记录格式保持一致即可,表示时间的 ts 字段,使用了 JsonFormat 注解,在序列化的时候以此来控制格式:
public class UserBehavior {
@JsonFormat
private long user_id;
@JsonFormat
private long item_id;
@JsonFormat
private long category_id;
@JsonFormat
private String behavior;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss'Z'")
private Date ts;
public UserBehavior() {
}
public UserBehavior(long user_id, long item_id, long category_id, String behavior, Date ts) {
this.user_id = user_id;
this.item_id = item_id;
this.category_id = category_id;
this.behavior = behavior;
this.ts = ts;
}
}
Java 对象序列化成 JSON 的序列化类:JsonSerializer
public class JsonSerializer<T> {
private final ObjectMapper jsonMapper = new ObjectMapper();
public String toJSONString(T r) {
try {
return jsonMapper.writeValueAsString(r);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + r, e);
}
}
public byte[] toJSONBytes(T r) {
try {
return jsonMapper.writeValueAsBytes(r);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + r, e);
}
}
}
向 kafka 发送消息的工具类:KafkaProducer:
public class KafkaProducer implements Consumer<UserBehavior> {
private final String topic;
private final org.apache.kafka.clients.producer.KafkaProducer<byte[], byte[]> producer;
private final JsonSerializer<UserBehavior> serializer;
public KafkaProducer(String kafkaTopic, String kafkaBrokers) {
评论