初识 Kafka 及安装
Kafka 起初是由 LinkedIn 公司采用 Scala 语言开发的一个多分区、多副本且基于 ZooKeeper 协调的分布式消息系统,现已被捐献给 Apache 基金会。目前 Kafka 已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。目前越来越多的开源分布式处理系统如 Cloudera、Sotrm、Spark、Flink 等都支持与 Kafka 集成。
1 架构
如上图所示,一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker(Kafka 支持水平扩展,一般 broker 数量越多,集群吞吐率越高)、若干 Consumer,以及一个 ZooKeeper 集群,其中 ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器的选举以及在 Consumer Group 发生变化时进行 rebalance 等操作的。Producer 将消息发送到 Broker,Broker 负责将收到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。
消息
Kafka 的数据单元被称为消息。类似数据库库中的“数据行”或“记录”。消息有字节数组组成,可以有一个可选的元数据,也就是键,键也是一个字节数组。当消息以一种可控的方式写入不同的分区时,会用到键。
主题和分区
Kafka 的消息通过主题进行分类,主题就好比数据库的表。主题可以被分为若干个分区,一个分区就是一个提交日志。消息以追加的方式写入分区,然后以先入先出的顺序读取。消息在被追加到分区文件的时候都会分配一个特定的偏移量(offset)。offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。
生产者和消费者
Kafka 的客户端就是 Kafka 系统的用户,他们被分为两种基本类型:生产者和消费者。生产者创建消息,又称为发布者或写入者。一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,不过在某些情况下,生产者会根据消息键和分区器把消息写到指定的分区。
消费者读取消息,又称为订阅者或读者。消费者订阅一个或多个主题,按照消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时,Kafka 会把它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的,消费者把每个分区最后读取的消息偏移量保存在 ZooKeeper 或 Kafka 上。消费者是消费者群组的一部分,也就是说,会有一个或多个消费者共同读取一个主题,群组保证每个分区只能被一个消费者使用。
broker
一个独立的 Kafka 服务器被称为 broker。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。根据特定的硬件及其性能特征,单个 broker 可以轻松处理数千个分区以及每秒百万级的消息量。
broker 是集群的组成部分,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。控制器负责管理工作,包括将分区分配给 broker 和监控 broker。在集群中,一个分区从属与一个 broker,该 broker 被称为分区的首领。一个分区可以分配给多个 broker,这个时候会发生分区复制。这种复制机制为分区提供了消息冗余,如果有一个 broker 失效,其它 broker 可以接管领导权。不过相关的消费者和生产者都要重新连接到新的首领。
2 Kafka 的应用场景
消息系统
Kafka 和传统的消息系统都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
存储系统
Kafka 把消息持久化到磁盘,相比于其它基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能既可。
流式处理平台
Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。
3 常用 Message Queue 对比
RabbitMQ
RabbitMQ 是使用 Erlang 编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了 Broker 构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
Redis
Redis 是一个基于 Key-Value 对的 NoSQL 数据库,开发维护很活跃。虽然它是一个 Key-Value 数据库存储系统,但它本身支持 MQ 功能,所以完全可以当做一个轻量级的队列服务来使用。对于 RabbitMQ 和 Redis 的入队和出队操作,各执行 100 万次,每 10 万次记录一次执行时间。测试数据分为 128Bytes、512Bytes、1K 和 10K 四个不同大小的数据。实验表明:入队时,当数据比较小时 Redis 的性能要高于 RabbitMQ,而如果数据大小超过了 10K,Redis 则慢的无法忍受;出队时,无论数据大小,Redis 都表现出非常好的性能,而 RabbitMQ 的出队性能则远低于 Redis。
ZeroMQ
ZeroMQ 号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ 能够实现 RabbitMQ 不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对 ZMQ 能够应用成功的挑战。ZeroMQ 具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用 ZeroMQ 程序库,可以使用 NuGet 安装,然后你就可以愉快的在应用程序之间发送消息了。但是 ZeroMQ 仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter 的 Storm 0.9.0 以前的版本中默认使用 ZeroMQ 作为数据流的传输(Storm 从 0.9 版本开始同时支持 ZeroMQ 和 Netty 作为传输模块)。
ActiveMQ
ActiveMQ 是 Apache 下的一个子项目。 类似于 ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于 RabbitMQ,它使用少量代码就可以高效地实现高级应用场景。
Kafka/Jafka
Kafka 是 Apache 下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而 Jafka 是在 Kafka 之上孵化而来的,即 Kafka 的一个升级版。具有以下特性:快速持久化,可以在 O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到 10W/s 的吞吐速率;完全的分布式系统,Broker、Producer、Consumer 都原生自动支持分布式,自动实现负载均衡;支持 Hadoop 数据并行加载,对于像 Hadoop 的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka 通过 Hadoop 的并行加载机制统一了在线和离线的消息处理。Apache Kafka 相对于 ActiveMQ 是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
4 Kafka 集群安装测试
4.1 环境准备
准备三台机器,修改它们的/etc/hosts,保证 3 台机器可以通过 hostname 互通网络,关闭防火墙。
硬件
软件
4.2 安装 Java
安装 jdk1.8,并配置环境变量
tar -xvf jdk-8u91-linux-x64.tar.gz
vim /etc/profile,在最后位置加入如下内容:
export JAVAHOME=/opt/jdk1.8.091
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVAHOME/lib/dt.jar:$JAVAHOME/lib/tools.jar
source /etc/profile
java –version,显示如下,安装完成。
4.3 安装 ZooKeeper
Zookeeper 集群被称为群组。 Zookeeper 使用的是一致性协议,所以建议每个群组里应该包含奇数个节点(比如 3 个、 5 个等),因为只有当群组里的大多数节点(也就是法定人数)处于可用状态, Zookeeper 才能处理外部的请求。也就是说,如果你有一个包含 3 个节点的群组,那么它允许一个节点失效。如果群组包含 5 个节点,那么它允许 2 个节点失效。
具体步骤如下:
1、解压安装包,创建相应文件夹
tar -xvf apache-zookeeper-3.6.2-bin.tar.gz
mv apache-zookeeper-3.6.2-bin zookeeper-3.6.2
mkdir /opt/zookeeper-3.6.2/data
mkdir /opt/zookeeper-3.6.2/logs
2、配置
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
dataDir=/opt/zookeeper-3.6.2/data
dataLogDir=/opt/zookeeper-3.6.2/logs
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
cd /opt/zookeeper-3.6.2/data
vim myid
1
vim /etc/profile
export ZOOKEEPER_HOME=/opt/zookeeper-3.6.2
export PATH=$PATH:$ZOOKEEPER_HOME/bin
source /etc/profile
3、使用命令启动:
zkServer.sh start
检查程序是否启动,存在 QuorumPeerMain 进程表示启动成功:
jps
4、打包,并发送到其它两台机器:
tar -cvf zookeeper.tar zookeeper-3.6.2/
scp zookeeper.tar node02:/opt
scp zookeeper.tar node03:/opt
将两台机器的包分别解压,并修改 myid 文件为 2,3,并配置环境变量分别启动既可。
检查 zookeeper 集群状态:
4.4 Kafka 集群搭建
1、安装及配置
tar -zxvf kafka_2.12-2.6.0.tgz
cd kafka_2.12-2.6.0/config
vim server.properties
配置内容如下:
broker.id=1
log.dirs=/opt/kafka_2.12-2.6.0/logs
zookeeper.connect=node01:2181,node02:2181,node03:2181
其中:
broker.id:kafka broker 的唯一标识
log.dirs:日志路径
zookeeper.connect:zookeeper 访问路径集合
vim /etc/profile
source /etc/profile
2、启动 Kafka
kafka-server-start.sh /opt/kafka_2.12-2.6.0/config/server.properties 1>/dev/null 2>&1 &
其中,1>/dev/null 2>&1 是将命令产生的输入和错误都输入到空设备,也就是不输出的意思,/dev/null 代表空设备。
3、打包,并发送到其它两台机器
tar -cvf kafka.tar kafka_2.12-2.6.0
scp kafka.tar node02:/opt
scp kafka.tar node03:/opt
将两台机器的压缩包解压,并修改 broker.id 文件为 2,3,并配置环境变量分别启动既可。
4.5 测试用例
1、创建 topic
集群启动成功后,我们通过创建一个名字为 test,partitions 为 3,replication 为 3 的 topic。
kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --partitions 3 --replication-factor 3 --topic test
2、向 topic 发送消息
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
输入 hello kafka,然后 enter 键,即向名为 test 的 topic 发送了一条消息:hello kafka
3、kafka 可视化工具
为了更好的看到上一步创建的 topic,以及发送的消息。这里介绍一个 kafka 可视化工具 Kafka Tools,安装后配置如下:
点开刚刚创建的连接,出现如下界面:
版权声明: 本文为 InfoQ 作者【犟马骝】的原创文章。
原文链接:【http://xie.infoq.cn/article/2a6d51d6910da03502b90e2f5】。文章转载请联系作者。
评论