Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,使用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
1、kafka介绍
Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
- 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
kafka使用场景
- 日志收集:可以用kafka收集各种服务的日志,通过统一接口的形式开放给各种消费者,例如hadoop、Hbase、Solr等。
- 消息系统:解耦生产和消费者,缓存消息。
- 用户活动追踪:kafka可以记录webapp或app用户的各种活动,如浏览网页,点击等活动,这些活动可以发送到kafka,然后订阅者通过订阅这些消息来做监控。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和storm。
2、kafka的下载
可以从kafka的官方网站下载最新的安装包: http://archive.apache.org/dist/kafka/
我们这里选择版本 3.2.0/,点击进入。
点击 kafka_2.12-3.2.0.tgz 进行下载。
3、单机安装kafka
3.1 前提条件
安装kafka的服务器必须有jvm环境,最低要求 JDK 必须是7或以上版本。JDK8的安装教程可参考:Linux下JAVA WEB服务器的搭建一(JDK\MYSQL\TOMCAT)。
检查jdk环境:java -version
,如下图所示,表示jdk环境已安装成功。
安装kafka的服务器还必须安装了zookeeper,安装教程可参考:linux环境下zookeeper3.8.0的安装教程
3.2 kafka安装包上传解压
使用xftp工具将下载好的 kafka_2.12-3.2.0.tgz 安装包上传到linux服务器,我是上传到了 /opt/kafka 目录,解压。
# 在opt目录下新建/kafka目录
cd /opt/
mkdir kafka
cd kafka
# 使用xftp工具或用rz命令将下载的压缩包上传到/opt/kafka目录下
# 解压
tar -xzf kafka_2.12-3.2.0.tgz
3.3 kafka的配置
创建目录 /usr/local/kafka/logs,用来存放 kafka 的存储持久化数据目录。
cd /usr/local
mkdir -p kafka/logs
修改配置文件,进入 /opt/kafka/kafka_2.12-3.2.0/conf 目录,修改 server.properties 文件的配置。
cd config/
vim server.properties
使用 vim server.properties 命令修改 server.properties 的配置,修改完后按esc,输入:wq!保存退出。
# 第一步:放开listeners
listeners=PLAINTEXT://172.19.82.206:9092
# 第二步:配置kafka存储持久化数据目录,修改路径:注意是修改,找到原来的修改为这个
log.dirs=/usr/local/kafka/logs
# 第三步:因为kafka是基于zookeeper的所以要加上zookeeper的ip和端口,如果是同一台机器,localhost:2181/myKafka
# 该步骤配置的是 kafka 连接 zk 的地址,以及 kafka 在 zk 上的根目录 myKafka。
zookeeper.connect=zookeeper服务器的ip地址:2181/myKafka
如果 Kafka 需开启远程连接,则修改上述配置的listeners。
# 把下述配置注释
listeners=PLAINTEXT://:172.19.82.206:9092
# 配置下述内容
advertised.listeners=PLAINTEXT://172.19.82.206:9092
3.4 启动kafka服务端
进入kafka的bin目录下:以下命令选择一种启动方式来启动kafka。
- 启动kafka之前必须保证zookeeper服务是正常启动的。
- 以守护线程的方式启动(运行的日志打印在 logs 目录里的server.log 里):
./kafka-server-start.sh -daemon ../config/server.properties
- 后台启动,不会打印日志到控制台(命令后面不加 & 则是前台模式启动,要停止可以使用Ctrl+C):
./kafka-server-start.sh ../config/server.properties &
到这里就成功安装并启动kafka了。
查看Kafka的后台进程
ps -ef | grep kafka
停止后台运行的Kafka
./kafka-server-stop.sh
3.5 错误排查
错误1
程序无法连接kafka服务,打开 /opt/kafka/kafka_2.12-3.2.0/logs 目录下的 server.log 文件能看到这么一串报错信息:
[2022-08-02 19:02:18,998] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID vlIYi5wxQ8iWsBC-CYOqfA doesn't match stored clusterId Some(Bp0j9RLGQISU7dXyJJiJpQ) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
at kafka.server.KafkaServer.startup(KafkaServer.scala:230)
at kafka.Kafka$.main(Kafka.scala:109)
at kafka.Kafka.main(Kafka.scala)
大概意思是代理正在尝试加入错误的集群。配置的zookeeper.connect可能错误。
找到 kafka 的存储持久化数据目录 /usr/local/kafka/logs,删除下面的 meta.properties 文件,然后重新启动 kafka 服务。
错误2
报了org.apache.kafka.common.KafkaException: Failed to acquire lock on file .lock in /home/hadoop/log/kafka-logs. A Kafka instance in another process or thread is using this directory.
org.apache.kafka.common.KafkaException: Failed to acquire lock on file .lock in /home/hadoop/log/kafka-logs. A Kafka instance in another process or thread is using this directory.
at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:240)
at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:236)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at kafka.log.LogManager.lockLogDirs(LogManager.scala:236)
at kafka.log.LogManager.<init>(LogManager.scala:97)
at kafka.log.LogManager$.apply(LogManager.scala:1011)
at kafka.server.KafkaServer.startup(KafkaServer.scala:240)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:42)
at kafka.Kafka$.main(Kafka.scala:75)
at com.cloudera.kafka.wrap.Kafka$$anonfun$1.apply(Kafka.scala:92)
at com.cloudera.kafka.wrap.Kafka$$anonfun$1.apply(Kafka.scala:92)
at com.cloudera.kafka.wrap.Kafka$.runMain(Kafka.scala:103)
at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:95)
at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala)
顾名思义 就是kafka服务启动后,不小心退出,按jps查看当前的进程
$ jps
2274 DataNode
2134 QuorumPeerMain
2375 JournalNode
2200 NameNode
2617 NodeManager
2506 DFSZKFailoverController
2781 ResourceManager
6013 Jps
4766 Kafka
然后 kill -9 4766
(4766是进程号)杀死kafka的进程,重启kafka。
3.6 开放防火墙端口
Ubuntu防火墙开放9092端口命令:ufw allow 9092
4、测试以及使用
4.1 主题
# 列出现有的主题
[root@master1 ~]# kafka-topics.sh --list --zookeeper localhost:2181/myKafka
# 创建主题,该主题包含一个分区,该分区为Leader分区,它没有Follower分区副本。
[root@master1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test --partitions 1 --replication-factor 1
# 查看分区信息
[root@master1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --list
# 查看指定主题的详细信息
[root@master1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test
# 删除指定主题
[root@master1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_test
列出现有主题,创建主题,该主题包含一个分区,该分区为Leader分区,它没有Follower分区副本。
查看指定主题的详细信息
创建主题,该主题包含多个分区(多个分区:横向扩展; 多个副本:高可用)
4.2 生产者
kafka-console-producer.sh用于生产消息
# 开启生产者
[root@node1 ~]# kafka-console-producer.sh --topic topic_test --broker-list localhost:9020
不要关闭这个窗口,同时创建消费者。
4.3 消费者
kafka-console-consumer.sh用于消费消息。
# 开启消费者方式一
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test
# 开启消费者方式二,从头消费,不按照偏移量消费
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test --from-beginning
此时,往生产者窗口写入消息,消费者窗口也能同步的接收到消息。
查看topic:
.\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic
topic_test
4.4 生产并消费消息
开启消费者和生产者,生产并消费消息
消费者,按照偏移量消费
消费者从头消费,不按照偏移量消费
5、重要(操作日志的处理)
kafka启动后,如果你去查看kafka所在的根目录,或者是kafka本身的目录,会发现已经默认生成一堆操作日志(这样看起来真心很乱),而且会不断生成不同时间戳的操作日志。
- 找到config下的log4j.properties,将路径更改下即可,这样就可以归档在一个文件夹下边了,路径根据自己喜好定义。
- 另外如何消除不断生成日志的问题,就是同一天的不同时间会不停生成。还是在log4j.properties中,本身都为trace,将其改为INFO即可。
6、集群安装kafka
6.1 环境准备
准备集群需要的服务器,集群IP如下:
- node1: 192.168.9.135
- node2: 192.168.9.136
- node3: 192.168.9.137
6.2 下载解压kafka程序包
和单机版操作一样,往集群的每个服务器上下载解压kafka程序包。
6.3 kafka的配置
在每个服务器上创建目录 /usr/local/kafka/logs,用来存放 kafka 的存储持久化数据目录。
cd /usr/local
mkdir -p kafka/logs
修改配置文件,进入每台服务器的 /opt/kafka/kafka_2.12-3.2.0/conf 目录,修改 server.properties 文件的配置。
cd config/
vim server.properties
node1配置修改
使用 vim server.properties 命令修改 server.properties 的配置,修改完后按esc,输入:wq!保存退出。
#修改配置文件的内容,或者是在原有的基础上添加上去
#broker的全局唯一编号,每台机器都不可以重复,第一台可以是0,第二三台可以为1,2
broker.id=0
# 修改listeners = PLAINTEXT://your.host.name:9092 中的ip为当前kafka所在的服务器的ip
listeners=PLAINTEXT://172.19.82.206:9092
#删除topic功能使能
delete.topic.enable=true
#kafka运行日志存放的路径,默认自动创建,方便查看报错信息
log.dirs=/usr/local/kafka/logs
#配置连接Zookeeper集群地址 /kafka 会创建单独的文件夹,根据你的集群修改
zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka
分发配置好的安装包到集群其他节点(node2,node3)
将node1的配置复制到其他节点,也可以分别在其他节点自主配置。
scp -r kafka_2.12-3.2.0 node2:/opt/kafka/kafka_2.12-3.2.0
scp -r kafka_2.12-3.2.0 node3:/opt/kafka/kafka_2.12-3.2.0
在node2与node3主机中将分发过去的kafka安装包下的server.properties进行修改,主要修改的部分是broker.id。
# 这里要注意broker.id是不能重复的
在node2主机上修改成broker.id=1
在node3主机上修改成broker.id=2
# your.host.name分别是node2和node3的ip
listeners = PLAINTEXT://your.host.name:9092
6.4 集群启动
# 分别在三台机器的kafka目录下运行启动命令,输入jps,查看到kafka进程即可
./kafka-server-start.sh -daemon ../config/server.properties
按照步骤来执行,一般不会出错,如果出现错误,可能是zk集群地址不对,或者是broker.id的值没有修改导致出现3台重复的broker。
6.5 集群测试
安装好kafka集群之后我们需要进行一下集群的测试,看是否可用。
- 创建Topic test1
./bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181/kafka --replication-factor 1 --partitions 1 --topic test1
- 查看创建的Topic列表
bin/kafka-topics.sh --list --zookeeper node1:2181,node2:2181,node3:2181/kafka
- 创建生产者生产数据
bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test1
- 测试消费者是否可以消费数据
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test1 --from-beginning
评论区