Kafka是一个分布式的、可分区的、可复制的消息系统。Kafka将消息以topic为单位进行归纳;将向 Kafka topic发布消息的程序称为producers(生产者),将预订topics并消费消息的程序成为consumer(消费者)。Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker。producers通过网络将消息发送到Kafka集群,集群向消费者提供消息,如下图所示:
1、创建SpringBoot工程
搭建项目之前先确保我们有kafka环境,如果需要自己搭建,可以参考linux环境下kafka3.2.0单机版和集群安装教程。
首先搭建两个springboot项目,一个作为生产者,另一个作为消费者。当然也可以把生产者和消费者放在同一个项目下。本例中我们放在同一个项目中。SpringBoot项目的创建过程不再赘述,创建后的工程结构如下:
从上图可以看到,controller包下面放kafka的生产者请求api,service下是生产者和消费者,项目采用maven管理。
2、配置及代码
2.1 pom.xml配置
在pom文件中引入 kafka依赖:
<!-- Springboot整合 Kafka使用。注意:版本一致 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version>
</dependency>
pom.xml文件完整示例:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.founder</groupId>
<artifactId>springboot-xhjz</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-xhjz</name>
<description>springboot-xhjz</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--引入kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
<build>
<!-- 打包不带版本号 -->
<finalName>xhjz-kafka</finalName>
<plugins>
<!-- jar运行配置 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.6.4</version>
</plugin>
</plugins>
</build>
<!-- 阿里云maven仓库 -->
<repositories>
<repository>
<id>public</id>
<name>aliyun nexus</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>public</id>
<name>aliyun nexus</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>
2.2 yaml配置文件
在application.yml中配置应用基础信息及Kafka服务信息。
server:
port: 8081
spring:
application:
name: xhjz-kafka
kafka:
# kafka集群信息
bootstrap-servers: 172.19.82.206:9092
# bootstrap-servers: 192.168.101.43:9092,192.168.101.44:9092,192.168.101.45:9092
producer: # 生产者配置
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384 #16K
buffer-memory: 33554432 #32M
acks: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 当生产端积累的消息达到batch-size或接收到消息linger.ms(单位为毫秒)后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
properties:
linger:
ms: 100
consumer:
group-id: default-group # 消费者组
enable-auto-commit: false # 关闭自动提交
auto-offset-reset: earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
listener 配置参数的作用就是影响消费者监听器(ListenerConsumer)处理之后提交动作。一般我们选用手动提交。有以下几种取值:
- RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
- BATCH:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
- TIME:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
- COUNT:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
- COUNT_TIME:TIME | COUNT 有一个条件满足时提交
- MANUAL:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
- MANUAL_IMMEDIATE:手动调用Acknowledgment.acknowledge()后立即提交
2.3 生产者和消费者代码
生产者代码
使用 KafkaTemplate 类的 send 方法发送消费。
- KafkaTemplate调用send时默认采用异步发送,如果需要同步获取发送结果,调用get方法。
import com.founder.xhjz.config.AppTopicConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* kafka生产者用来进行消息生产
*/
@Component
public class KafkaProducer {
private final static String TOPIC_NAME = "test"; //topic的名称
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private AppTopicConfig appTopicConfig;
/**
* 分区发送消息(默认异步方式)
*/
public void send() {
// 发送字符串消息
// 未指定分区发送
kafkaTemplate.send(TOPIC_NAME, "key", "test message send~");
// 指定分区发送
// kafkaTemplate.send(TOPIC_NAME, 0, "key", "test message send~");
// 发送json数据
// 1.构建消息数据
// Order order = new Order(orderId, "订单-" + orderId, 1000.00);
// 2.发送消息
// 未指定分区发送
// kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
// 指定分区发送
// kafkaTemplate.send(TOPIC_NAME, 0, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
}
/**
* 同步发送消息
*/
public void syncSendMsg() {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, "key", "test message send~");
SendResult<String, String> sendResult = null;
try {
sendResult = future.get();
// 注意,可以设置等待时间,超出后,不再等候结果
// SendResult<String, String> result = future.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
// } catch (TimeoutException e) {
// throw new RuntimeException(e);
}
RecordMetadata metadata = sendResult.getRecordMetadata();
ProducerRecord<String, String> producerRecord = sendResult.getProducerRecord();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
}
/**
* 异步发送消息
*/
public void asyncSendMsg() {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, "key", "test message send~");
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
//发送消息成功回调
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println(result.getProducerRecord());
//getRecordMetadata里面存在发送的topic和partition等信息
System.out.println(result.getRecordMetadata());
}
//发送消息失败回调
@Override
public void onFailure(Throwable ex) {
ex.printStackTrace();
}
});
}
// 发送指定消息
public void send(String msg) {
kafkaTemplate.send(TOPIC_NAME, "key", msg);
}
}
消费者监听注解
使用 @KafkaListener 注解来注入消费者。常见参数如下:
- group-id:表示消费组,如果没有指定,则会使用配置文件中设置的默认的groupId。
- topicPartitions:一个消费组可以消费多个主题分区
- TopicPartition:主题分区相关
- concurrency:同组下的消费者个数,必须小于等于分区总数,大于意义不大,没必要大于。
@KafkaListener(
groupId = "mySpringBootGroup",
topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(
topic = "topic2",
partitions = "0",
partitionOffsets = @PartitionOffset(
partition = "1",
initialOffset = "100"))
},
concurrency = "6"
)
消费者代码
listenGroup1和listenGroup2为二个消费者,都对同一消息进行消费,一般情况下,只需要保留一个即可。消费者通过@KafkaListener注解消费消息。
- 如果一个主题要被多个消费组消费,那么我们使用 @KafkaListener 注解来注入多个消费者即可。
- 消费者可以独立一个项目。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* kafka消费者
*/
@Component
public class KafkaConsumer {
static int nums = 0;
// kafka的监听器,topic为"test",消费者组为"testGroup", 如果不指定group,默认取yml里配置的
// 同一group下的两个消费者均分消息,testGroup下的2个消费者均分消息
@KafkaListener(topics = "test", groupId = "testGroup")
public void listenGroup1(ConsumerRecord<String, String> record, Acknowledgment ack) {
String topic = record.topic();
String value = record.value();
System.out.println("----consumer 1-----" + nums++ + "---------");
System.out.println("topic = " + topic + ", value = " + value);
System.out.println(record);
// 手动提交offset
ack.acknowledge();
}
// 同一group下的两个消费者均分消息,testGroup下的2个消费者均分消息
@KafkaListener(topics = "test", groupId = "testGroup")
public void listenGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
String topic = record.topic();
String value = record.value();
System.out.println("----consumer 2-----" + nums++ + "---------");
System.out.println("topic = " + topic + ", value = " + value);
System.out.println(record);
// 手动提交offset
ack.acknowledge();
}
// 如果一个主题要被多个消费组消费,那么我们使用 @KafkaListener注解来注入多个消费组即可。testGroup2下只有1个消费者,得到全部消息
@KafkaListener(topics = "test", groupId = "testGroup2")
public void listenGroup3(ConsumerRecord<String, String> record, Acknowledgment ack) {
String topic = record.topic();
String value = record.value();
System.out.println("topic = " + topic + ", value = " + value);
System.out.println(record);
ack.acknowledge();
}
}
2.4 生产者控制器
kafka生产者控制器,实际上就是一个Controller,对外提供访问接口,用来进行消息生产。
import com.founder.xhjz.service.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
*
*/
@RestController
@RequestMapping("/api")
public class KafkaProdController {
@Autowired
private KafkaProducer kafkaProducer;
@RequestMapping("/send")
public void send() {
kafkaProducer.send();
}
// 发送指定消息
@GetMapping("/send/{msg}")
public void sendMessage(@PathVariable("msg") String msg) {
kafkaProducer.send(msg);
}
}
3、生产及消费测试
启动SpringBoot服务并调用生产者接口 http://localhost:8081/api/send(调用3次),可以在控制台看到消息已经被2个消费者处理。
---------0---------
topic = test, value = test message send~
topic = test, value = test message send~
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 21, CreateTime = 1659508905348, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 21, CreateTime = 1659508905348, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
topic = test, value = test message send~
---------1---------
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 22, CreateTime = 1659508922093, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
topic = test, value = test message send~
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 22, CreateTime = 1659508922093, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
topic = test, value = test message send~
---------2---------
topic = test, value = test message send~
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 23, CreateTime = 1659508940922, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 23, CreateTime = 1659508940922, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
关闭掉消费者testGroup2,即去掉或注释 @KafkaListener(topics = "test", groupId = "testGroup2")
, 再次启动服务并调用生产者接口http://localhost:8081/api/send(调用3次),可以在控制台看到消息只被1个消费者处理。
---------0---------
topic = test, value = test message send~
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 24, CreateTime = 1659509111424, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
---------1---------
topic = test, value = test message send~
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 25, CreateTime = 1659509114522, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
---------2---------
topic = test, value = test message send~
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 26, CreateTime = 1659509116559, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
并调用生产者接口 http://localhost:8081/api/send/111111 发送指定消息,可以在控制台看到指定消息 111111 已经被消费者处理。
---------3---------
topic = test, value = 111111
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 35, CreateTime = 1659511200699, serialized key size = 3, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = 111111)
4、同步及异步发送消息
4.1 同步发送消息
同步发送消息时,需要在每次send()方法时调用get()方法,因为每次send()方法会返回一个Future类型的值,Future的get()方法会一直阻塞,直到该线程的任务获取到返回值,即当消息发送成功。
- 调同步发送时请求被阻断,一直等待返回结果(消息发送成功),超时后返回错误。
在 KafkaProducer 代码中添加同步发送消息方法 syncSendMsg()。
/**
* 同步发送消息
*/
public void syncSendMsg(){
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, "key", "test message send~");
SendResult<String, String> sendResult = null;
try {
sendResult = future.get();
// 注意,可以设置等待时间,超出后,不再等候结果
// SendResult<String, String> result = future.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
// } catch (TimeoutException e) {
// e.printStackTrace();
}
RecordMetadata metadata = sendResult.getRecordMetadata();
ProducerRecord<String, String> producerRecord = sendResult.getProducerRecord();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
}
4.2 异步发送消息
异步发送消息时,只要消息积累达到batch.size值或者积累消息的时间超过linger.ms(二者满足其一),producer就会把该批量的消息发送到topic中。
- 注:batch.size默认是16384,linger.ms默认是0。
- 调异步发送时(默认发送接口),请求立刻返回。
异步发送的消息怎么确认发送情况?
示例1
/**
* 异步发送消息
*/
public void asyncSendMsg() {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, "key", "test message send~");
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
//发送消息成功回调
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println(result.getProducerRecord());
//getRecordMetadata里面存在发送的topic和partition等信息
System.out.println(result.getRecordMetadata());
}
//发送消息失败回调
@Override
public void onFailure(Throwable ex) {
ex.printStackTrace();
}
});
}
示例2
使用注册监听,自定义生产者发送结果监听类,需要实现 ProducerListener类。
- 发送消息成功则会回调用 onSuccess方法。
- 发送消息失败则会回调用 onError方法。
@Configuration
public class KafkaListener {
private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class);
@Autowired
KafkaTemplate kafkaTemplate;
//配置监听
@PostConstruct
private void listener() {
kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
@Override
public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
logger.info("ok,message={}", producerRecord.value());
}
@Override
public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {
logger.error("error!message={}", producerRecord.value());
}
});
}
}
5、分区策略
分区策略决定了消息根据key投放到哪个分区,也是顺序消费保障的基石。
- 给定了分区号,直接将数据发送到指定的分区里面去
- 没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
- 既没有给定分区号,也没有给定key值,直接轮循进行分区(默认)
- 自定义分区,你想怎么做就怎么做
5.1 验证默认分区规则
发送者代码参考:PartitionProducer.java
//测试分区发送
@RestController
public class PartitionProducer {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
// 指定分区发送
// 不管你key是什么,到同一个分区
@GetMapping("/kafka/partitionSend/{key}")
public void setPartition(@PathVariable("key") String key) {
kafkaTemplate.send("test", 0, key, "key=" + key + ",msg=指定0号分区");
}
// 指定key发送,不指定分区
// 根据key做hash,相同的key到同一个分区
@GetMapping("/kafka/keysend/{key}")
public void setKey(@PathVariable("key") String key) {
kafkaTemplate.send("test", key, "key=" + key + ",msg=不指定分区");
}
}
消费者代码使用:PartitionConsumer.java
@Component
public class PartitionConsumer {
private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);
//分区消费
@KafkaListener(topics = {"test"},topicPattern = "0")
public void onMessage(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("partition=0,message:[{}]", msg);
}
}
@KafkaListener(topics = {"test"},topicPattern = "1")
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("partition=1,message:[{}]", msg);
}
}
}
访问setKey(也就是只给了key的方法),可以看到key相同的被hash到了同一个分区:
再访问setPartition来设置分区号0来发送,可以看到无论key是什么,都是分区0来消费:
5.2 自定义分区
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 定义自己的分区策略
// 如果key以0开头,发到0号分区
// 其他都扔到1号分区
String keyStr = key+"";
if (keyStr.startsWith("0")){
return 0;
}else {
return 1;
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
@Configuration
public class MyPartitionTemplate {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
KafkaTemplate kafkaTemplate;
@PostConstruct
public void setKafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//注意分区器在这里!!!
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
}
public KafkaTemplate getKafkaTemplate(){
return kafkaTemplate;
}
}
发送使用:MyPartitionProducer.java。
//测试自定义分区发送
@RestController
public class MyPartitionProducer {
@Autowired
MyPartitionTemplate template;
// 使用0开头和其他任意字母开头的key发送消息
// 看控制台的输出,在哪个分区里?
@GetMapping("/kafka/myPartitionSend/{key}")
public void setPartition(@PathVariable("key") String key) {
template.getKafkaTemplate().send("test", key,"key="+key+",msg=自定义分区策略");
}
}
发送0开头和非0开头两种key,可以看到0开头的key发送到了分区0,其余的都分发到了分区1:
评论区