侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 297 篇文章
  • 累计创建 134 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

linux环境下kafka3.2.0单机版和集群安装教程

孔子说JAVA
2022-08-05 / 0 评论 / 1 点赞 / 98 阅读 / 8,008 字 / 正在检测是否收录...

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,使用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

0c0d755cccf880fdc1ad7d08528b4808

1、kafka介绍

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

  1. 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  2. 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
  3. 可扩展性:kafka集群支持热扩展
  4. 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  5. 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  6. 高并发:支持数千个客户端同时读写

kafka使用场景

  1. 日志收集:可以用kafka收集各种服务的日志,通过统一接口的形式开放给各种消费者,例如hadoop、Hbase、Solr等。
  2. 消息系统:解耦生产和消费者,缓存消息。
  3. 用户活动追踪:kafka可以记录webapp或app用户的各种活动,如浏览网页,点击等活动,这些活动可以发送到kafka,然后订阅者通过订阅这些消息来做监控。
  4. 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  5. 流式处理:比如spark streaming和storm。

2、kafka的下载

可以从kafka的官方网站下载最新的安装包: http://archive.apache.org/dist/kafka/

image-1659435132359

我们这里选择版本 3.2.0/,点击进入。

image-1659435229842

点击 kafka_2.12-3.2.0.tgz 进行下载。

3、单机安装kafka

3.1 前提条件

安装kafka的服务器必须有jvm环境,最低要求 JDK 必须是7或以上版本。JDK8的安装教程可参考:Linux下JAVA WEB服务器的搭建一(JDK\MYSQL\TOMCAT)

检查jdk环境:java -version,如下图所示,表示jdk环境已安装成功。

image-1659426394104

安装kafka的服务器还必须安装了zookeeper,安装教程可参考:linux环境下zookeeper3.8.0的安装教程

3.2 kafka安装包上传解压

使用xftp工具将下载好的 kafka_2.12-3.2.0.tgz 安装包上传到linux服务器,我是上传到了 /opt/kafka 目录,解压。

# 在opt目录下新建/kafka目录

cd /opt/
mkdir kafka

cd kafka

# 使用xftp工具或用rz命令将下载的压缩包上传到/opt/kafka目录下
# 解压

tar -xzf kafka_2.12-3.2.0.tgz

3.3 kafka的配置

创建目录 /usr/local/kafka/logs,用来存放 kafka 的存储持久化数据目录。

cd /usr/local
mkdir -p kafka/logs

修改配置文件,进入 /opt/kafka/kafka_2.12-3.2.0/conf 目录,修改 server.properties 文件的配置。

cd config/

vim server.properties

使用 vim server.properties 命令修改 server.properties 的配置,修改完后按esc,输入:wq!保存退出。

# 第一步:放开listeners
listeners=PLAINTEXT://172.19.82.206:9092
 
# 第二步:配置kafka存储持久化数据目录,修改路径:注意是修改,找到原来的修改为这个
log.dirs=/usr/local/kafka/logs
 
# 第三步:因为kafka是基于zookeeper的所以要加上zookeeper的ip和端口,如果是同一台机器,localhost:2181/myKafka
# 该步骤配置的是 kafka 连接 zk 的地址,以及 kafka 在 zk 上的根目录 myKafka。
zookeeper.connect=zookeeper服务器的ip地址:2181/myKafka

如果 Kafka 需开启远程连接,则修改上述配置的listeners。

# 把下述配置注释
listeners=PLAINTEXT://:172.19.82.206:9092
# 配置下述内容
advertised.listeners=PLAINTEXT://172.19.82.206:9092

3.4 启动kafka服务端

进入kafka的bin目录下:以下命令选择一种启动方式来启动kafka。

  • 启动kafka之前必须保证zookeeper服务是正常启动的。
  1. 以守护线程的方式启动(运行的日志打印在 logs 目录里的server.log 里):
./kafka-server-start.sh -daemon  ../config/server.properties
  1. 后台启动,不会打印日志到控制台(命令后面不加 & 则是前台模式启动,要停止可以使用Ctrl+C):
./kafka-server-start.sh  ../config/server.properties &

到这里就成功安装并启动kafka了。

查看Kafka的后台进程

ps -ef | grep kafka

停止后台运行的Kafka

./kafka-server-stop.sh 

image-1659437209418

3.5 错误排查

错误1

程序无法连接kafka服务,打开 /opt/kafka/kafka_2.12-3.2.0/logs 目录下的 server.log 文件能看到这么一串报错信息:

[2022-08-02 19:02:18,998] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID vlIYi5wxQ8iWsBC-CYOqfA doesn't match stored clusterId Some(Bp0j9RLGQISU7dXyJJiJpQ) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
        at kafka.server.KafkaServer.startup(KafkaServer.scala:230)
        at kafka.Kafka$.main(Kafka.scala:109)
        at kafka.Kafka.main(Kafka.scala)

大概意思是代理正在尝试加入错误的集群。配置的zookeeper.connect可能错误。

找到 kafka 的存储持久化数据目录 /usr/local/kafka/logs,删除下面的 meta.properties 文件,然后重新启动 kafka 服务。

错误2

报了org.apache.kafka.common.KafkaException: Failed to acquire lock on file .lock in /home/hadoop/log/kafka-logs. A Kafka instance in another process or thread is using this directory.

org.apache.kafka.common.KafkaException: Failed to acquire lock on file .lock in /home/hadoop/log/kafka-logs. A Kafka instance in another process or thread is using this directory.
	at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:240)
	at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:236)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at kafka.log.LogManager.lockLogDirs(LogManager.scala:236)
	at kafka.log.LogManager.<init>(LogManager.scala:97)
	at kafka.log.LogManager$.apply(LogManager.scala:1011)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:240)
	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:42)
	at kafka.Kafka$.main(Kafka.scala:75)
	at com.cloudera.kafka.wrap.Kafka$$anonfun$1.apply(Kafka.scala:92)
	at com.cloudera.kafka.wrap.Kafka$$anonfun$1.apply(Kafka.scala:92)
	at com.cloudera.kafka.wrap.Kafka$.runMain(Kafka.scala:103)
	at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:95)
	at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala)

顾名思义 就是kafka服务启动后,不小心退出,按jps查看当前的进程

$ jps
2274 DataNode
2134 QuorumPeerMain
2375 JournalNode
2200 NameNode
2617 NodeManager
2506 DFSZKFailoverController
2781 ResourceManager
6013 Jps
4766 Kafka

然后 kill -9 4766 (4766是进程号)杀死kafka的进程,重启kafka。

3.6 开放防火墙端口

Ubuntu防火墙开放9092端口命令:ufw allow 9092

image-1659437475503

4、测试以及使用

4.1 主题

# 列出现有的主题
[root@master1 ~]# kafka-topics.sh --list --zookeeper localhost:2181/myKafka

# 创建主题,该主题包含一个分区,该分区为Leader分区,它没有Follower分区副本。
[root@master1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test --partitions 1 --replication-factor 1

# 查看分区信息
[root@master1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --list

# 查看指定主题的详细信息
[root@master1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test 

# 删除指定主题
[root@master1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_test 

列出现有主题,创建主题,该主题包含一个分区,该分区为Leader分区,它没有Follower分区副本。

image-1659486133679

查看指定主题的详细信息

image-1659486143308

创建主题,该主题包含多个分区(多个分区:横向扩展; 多个副本:高可用)

image-1659486170070

4.2 生产者

kafka-console-producer.sh用于生产消息

# 开启生产者
[root@node1 ~]# kafka-console-producer.sh --topic topic_test --broker-list  localhost:9020

不要关闭这个窗口,同时创建消费者。

4.3 消费者

kafka-console-consumer.sh用于消费消息。

# 开启消费者方式一
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test 

# 开启消费者方式二,从头消费,不按照偏移量消费
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test --from-beginning

此时,往生产者窗口写入消息,消费者窗口也能同步的接收到消息。

查看topic:

 .\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic
topic_test

4.4 生产并消费消息

开启消费者和生产者,生产并消费消息

image-1659486320638

消费者,按照偏移量消费

image-1659486340156

消费者从头消费,不按照偏移量消费

image-1659486351331

5、重要(操作日志的处理)

kafka启动后,如果你去查看kafka所在的根目录,或者是kafka本身的目录,会发现已经默认生成一堆操作日志(这样看起来真心很乱),而且会不断生成不同时间戳的操作日志。

  1. 找到config下的log4j.properties,将路径更改下即可,这样就可以归档在一个文件夹下边了,路径根据自己喜好定义。
  2. 另外如何消除不断生成日志的问题,就是同一天的不同时间会不停生成。还是在log4j.properties中,本身都为trace,将其改为INFO即可。

6、集群安装kafka

6.1 环境准备

准备集群需要的服务器,集群IP如下:

  1. node1: 192.168.9.135
  2. node2: 192.168.9.136
  3. node3: 192.168.9.137

6.2 下载解压kafka程序包

和单机版操作一样,往集群的每个服务器上下载解压kafka程序包。

6.3 kafka的配置

在每个服务器上创建目录 /usr/local/kafka/logs,用来存放 kafka 的存储持久化数据目录。

cd /usr/local
mkdir -p kafka/logs

修改配置文件,进入每台服务器的 /opt/kafka/kafka_2.12-3.2.0/conf 目录,修改 server.properties 文件的配置。

cd config/

vim server.properties

node1配置修改

使用 vim server.properties 命令修改 server.properties 的配置,修改完后按esc,输入:wq!保存退出。

#修改配置文件的内容,或者是在原有的基础上添加上去
#broker的全局唯一编号,每台机器都不可以重复,第一台可以是0,第二三台可以为1,2
broker.id=0

# 修改listeners = PLAINTEXT://your.host.name:9092 中的ip为当前kafka所在的服务器的ip
listeners=PLAINTEXT://172.19.82.206:9092

#删除topic功能使能
delete.topic.enable=true

#kafka运行日志存放的路径,默认自动创建,方便查看报错信息
log.dirs=/usr/local/kafka/logs

#配置连接Zookeeper集群地址  /kafka 会创建单独的文件夹,根据你的集群修改
zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka

分发配置好的安装包到集群其他节点(node2,node3)

将node1的配置复制到其他节点,也可以分别在其他节点自主配置。

scp -r kafka_2.12-3.2.0 node2:/opt/kafka/kafka_2.12-3.2.0
scp -r kafka_2.12-3.2.0 node3:/opt/kafka/kafka_2.12-3.2.0

在node2与node3主机中将分发过去的kafka安装包下的server.properties进行修改,主要修改的部分是broker.id

# 这里要注意broker.id是不能重复的
在node2主机上修改成broker.id=1
在node3主机上修改成broker.id=2

# your.host.name分别是node2和node3的ip
listeners = PLAINTEXT://your.host.name:9092

6.4 集群启动

# 分别在三台机器的kafka目录下运行启动命令,输入jps,查看到kafka进程即可
./kafka-server-start.sh -daemon  ../config/server.properties

按照步骤来执行,一般不会出错,如果出现错误,可能是zk集群地址不对,或者是broker.id的值没有修改导致出现3台重复的broker。

6.5 集群测试

安装好kafka集群之后我们需要进行一下集群的测试,看是否可用。

  1. 创建Topic test1
./bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181/kafka --replication-factor 1 --partitions 1 --topic test1

image-1659488407045

  1. 查看创建的Topic列表
bin/kafka-topics.sh --list --zookeeper node1:2181,node2:2181,node3:2181/kafka

image-1659488437937

  1. 创建生产者生产数据
bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test1

image-1659488474758

  1. 测试消费者是否可以消费数据
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test1 --from-beginning

image-1659488509946

1

评论区