侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 285 篇文章
  • 累计创建 125 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

Springboot整合kafka项目完整示例

孔子说JAVA
2022-08-06 / 0 评论 / 0 点赞 / 93 阅读 / 20,186 字 / 正在检测是否收录...

Kafka是一个分布式的、可分区的、可复制的消息系统。Kafka将消息以topic为单位进行归纳;将向 Kafka topic发布消息的程序称为producers(生产者),将预订topics并消费消息的程序成为consumer(消费者)。Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker。producers通过网络将消息发送到Kafka集群,集群向消费者提供消息,如下图所示:

379f3543f323c2da2fdd25b963002003

1、创建SpringBoot工程

搭建项目之前先确保我们有kafka环境,如果需要自己搭建,可以参考linux环境下kafka3.2.0单机版和集群安装教程

首先搭建两个springboot项目,一个作为生产者,另一个作为消费者。当然也可以把生产者和消费者放在同一个项目下。本例中我们放在同一个项目中。SpringBoot项目的创建过程不再赘述,创建后的工程结构如下:

image-1659513087573

从上图可以看到,controller包下面放kafka的生产者请求api,service下是生产者和消费者,项目采用maven管理。

2、配置及代码

2.1 pom.xml配置

在pom文件中引入 kafka依赖:

<!-- Springboot整合 Kafka使用。注意:版本一致   -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.1</version>
</dependency>

pom.xml文件完整示例:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.founder</groupId>
    <artifactId>springboot-xhjz</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-xhjz</name>
    <description>springboot-xhjz</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--引入kafka依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.1</version>
        </dependency>
    </dependencies>

    <build>
        <!-- 打包不带版本号 -->
        <finalName>xhjz-kafka</finalName>
        <plugins>
            <!-- jar运行配置 -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.6.4</version>
            </plugin>
        </plugins>
    </build>

    <!-- 阿里云maven仓库 -->
    <repositories>
        <repository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>
</project>

2.2 yaml配置文件

在application.yml中配置应用基础信息及Kafka服务信息。

server:
  port: 8081

spring:
  application:
    name: xhjz-kafka
  kafka:
    # kafka集群信息
    bootstrap-servers: 172.19.82.206:9092
#    bootstrap-servers: 192.168.101.43:9092,192.168.101.44:9092,192.168.101.45:9092
    producer: # 生产者配置
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384 #16K
      buffer-memory: 33554432 #32M
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 当生产端积累的消息达到batch-size或接收到消息linger.ms(单位为毫秒)后,生产者就会将消息提交给kafka
      # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
      properties:
        linger:
          ms: 100
    consumer:
      group-id: default-group # 消费者组
      enable-auto-commit: false # 关闭自动提交
      auto-offset-reset: earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

listener 配置参数的作用就是影响消费者监听器(ListenerConsumer)处理之后提交动作。一般我们选用手动提交。有以下几种取值:

  1. RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
  2. BATCH:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
  3. TIME:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
  4. COUNT:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
  5. COUNT_TIME:TIME | COUNT 有一个条件满足时提交
  6. MANUAL:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
  7. MANUAL_IMMEDIATE:手动调用Acknowledgment.acknowledge()后立即提交

2.3 生产者和消费者代码

生产者代码

使用 KafkaTemplate 类的 send 方法发送消费。

  • KafkaTemplate调用send时默认采用异步发送,如果需要同步获取发送结果,调用get方法。
import com.founder.xhjz.config.AppTopicConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * kafka生产者用来进行消息生产
 */
@Component
public class KafkaProducer {
    private final static String TOPIC_NAME = "test"; //topic的名称

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private AppTopicConfig appTopicConfig;

    /**
     * 分区发送消息(默认异步方式)
     */
    public void send() {
        // 发送字符串消息
        // 未指定分区发送
        kafkaTemplate.send(TOPIC_NAME, "key", "test message send~");

        // 指定分区发送
        // kafkaTemplate.send(TOPIC_NAME, 0, "key", "test message send~");

        // 发送json数据
        // 1.构建消息数据
        // Order order = new Order(orderId, "订单-" + orderId, 1000.00);
        // 2.发送消息
        // 未指定分区发送
        // kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
        // 指定分区发送
        // kafkaTemplate.send(TOPIC_NAME, 0, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
    }

    /**
     * 同步发送消息
     */
    public void syncSendMsg() {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, "key", "test message send~");
        SendResult<String, String> sendResult = null;
        try {
            sendResult = future.get();
            // 注意,可以设置等待时间,超出后,不再等候结果
            // SendResult<String, String> result = future.get(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
//        } catch (TimeoutException e) {
//            throw new RuntimeException(e);
        }
        RecordMetadata metadata = sendResult.getRecordMetadata();
        ProducerRecord<String, String> producerRecord = sendResult.getProducerRecord();
        System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
    }

    /**
     * 异步发送消息
     */
    public void asyncSendMsg() {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, "key", "test message send~");
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            //发送消息成功回调
            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println(result.getProducerRecord());
                //getRecordMetadata里面存在发送的topic和partition等信息
                System.out.println(result.getRecordMetadata());
            }

            //发送消息失败回调
            @Override
            public void onFailure(Throwable ex) {
                ex.printStackTrace();
            }
        });
    }

    // 发送指定消息
    public void send(String msg) {
        kafkaTemplate.send(TOPIC_NAME, "key", msg);
    }
}

消费者监听注解

使用 @KafkaListener 注解来注入消费者。常见参数如下:

  1. group-id:表示消费组,如果没有指定,则会使用配置文件中设置的默认的groupId。
  2. topicPartitions:一个消费组可以消费多个主题分区
  3. TopicPartition:主题分区相关
  4. concurrency:同组下的消费者个数,必须小于等于分区总数,大于意义不大,没必要大于。
@KafkaListener(
    groupId = "mySpringBootGroup", 
    topicPartitions = {
        @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
        @TopicPartition(
            topic = "topic2", 
            partitions = "0",
            partitionOffsets = @PartitionOffset(
                partition = "1", 
                initialOffset = "100"))
          },
        concurrency = "6"
)

消费者代码

listenGroup1和listenGroup2为二个消费者,都对同一消息进行消费,一般情况下,只需要保留一个即可。消费者通过@KafkaListener注解消费消息。

  • 如果一个主题要被多个消费组消费,那么我们使用 @KafkaListener 注解来注入多个消费者即可。
  • 消费者可以独立一个项目。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
 * kafka消费者
 */
@Component
public class KafkaConsumer {
    static int nums = 0;
    
    // kafka的监听器,topic为"test",消费者组为"testGroup", 如果不指定group,默认取yml里配置的
    // 同一group下的两个消费者均分消息,testGroup下的2个消费者均分消息
    @KafkaListener(topics = "test", groupId = "testGroup")
    public void listenGroup1(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String topic = record.topic();
        String value = record.value();
        System.out.println("----consumer 1-----" + nums++ + "---------");
        System.out.println("topic = " + topic + ", value = " + value);
        System.out.println(record);
        // 手动提交offset
        ack.acknowledge();
    }

    // 同一group下的两个消费者均分消息,testGroup下的2个消费者均分消息
    @KafkaListener(topics = "test", groupId = "testGroup")
    public void listenGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String topic = record.topic();
        String value = record.value();
        System.out.println("----consumer 2-----" + nums++ + "---------");
        System.out.println("topic = " + topic + ", value = " + value);
        System.out.println(record);
        // 手动提交offset
        ack.acknowledge();
    }

    // 如果一个主题要被多个消费组消费,那么我们使用 @KafkaListener注解来注入多个消费组即可。testGroup2下只有1个消费者,得到全部消息
    @KafkaListener(topics = "test", groupId = "testGroup2")
    public void listenGroup3(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String topic = record.topic();
        String value = record.value();
        System.out.println("topic = " + topic + ", value = " + value);
        System.out.println(record);
        ack.acknowledge();
    }
}

2.4 生产者控制器

kafka生产者控制器,实际上就是一个Controller,对外提供访问接口,用来进行消息生产。

import com.founder.xhjz.service.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 *
 */
@RestController
@RequestMapping("/api")
public class KafkaProdController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @RequestMapping("/send")
    public void send() {
        kafkaProducer.send();
    }

    // 发送指定消息
    @GetMapping("/send/{msg}")
    public void sendMessage(@PathVariable("msg") String msg) {
        kafkaProducer.send(msg);
    }
}

3、生产及消费测试

启动SpringBoot服务并调用生产者接口 http://localhost:8081/api/send(调用3次),可以在控制台看到消息已经被2个消费者处理。

---------0---------
topic = test, value = test message send~
topic = test, value = test message send~
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 21, CreateTime = 1659508905348, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 21, CreateTime = 1659508905348, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
topic = test, value = test message send~
---------1---------
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 22, CreateTime = 1659508922093, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
topic = test, value = test message send~
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 22, CreateTime = 1659508922093, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
topic = test, value = test message send~
---------2---------
topic = test, value = test message send~
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 23, CreateTime = 1659508940922, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 23, CreateTime = 1659508940922, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)

关闭掉消费者testGroup2,即去掉或注释 @KafkaListener(topics = "test", groupId = "testGroup2"), 再次启动服务并调用生产者接口http://localhost:8081/api/send(调用3次),可以在控制台看到消息只被1个消费者处理。

---------0---------
topic = test, value = test message send~
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 24, CreateTime = 1659509111424, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
---------1---------
topic = test, value = test message send~
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 25, CreateTime = 1659509114522, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)
---------2---------
topic = test, value = test message send~
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 26, CreateTime = 1659509116559, serialized key size = 3, serialized value size = 18, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = test message send~)

并调用生产者接口 http://localhost:8081/api/send/111111 发送指定消息,可以在控制台看到指定消息 111111 已经被消费者处理。

---------3---------
topic = test, value = 111111
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 35, CreateTime = 1659511200699, serialized key size = 3, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = 111111)

4、同步及异步发送消息

4.1 同步发送消息

同步发送消息时,需要在每次send()方法时调用get()方法,因为每次send()方法会返回一个Future类型的值,Future的get()方法会一直阻塞,直到该线程的任务获取到返回值,即当消息发送成功。

  • 调同步发送时请求被阻断,一直等待返回结果(消息发送成功),超时后返回错误。

在 KafkaProducer 代码中添加同步发送消息方法 syncSendMsg()。

/**
 * 同步发送消息
 */
public void syncSendMsg(){
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, "key", "test message send~");
    SendResult<String, String> sendResult = null;
    try {
        sendResult = future.get();
        // 注意,可以设置等待时间,超出后,不再等候结果
        // SendResult<String, String> result = future.get(3, TimeUnit.SECONDS);
    } catch (InterruptedException  e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    // } catch (TimeoutException e) {
    //    e.printStackTrace();
    }
    RecordMetadata metadata = sendResult.getRecordMetadata();
    ProducerRecord<String, String> producerRecord = sendResult.getProducerRecord();
    System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
}

4.2 异步发送消息

异步发送消息时,只要消息积累达到batch.size值或者积累消息的时间超过linger.ms(二者满足其一),producer就会把该批量的消息发送到topic中。

  • 注:batch.size默认是16384,linger.ms默认是0。
  • 调异步发送时(默认发送接口),请求立刻返回。

异步发送的消息怎么确认发送情况?

示例1

/**
 * 异步发送消息
 */
public void asyncSendMsg() {
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, "key", "test message send~");
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        //发送消息成功回调
        @Override
        public void onSuccess(SendResult<String, String> result) {
            System.out.println(result.getProducerRecord());
            //getRecordMetadata里面存在发送的topic和partition等信息
            System.out.println(result.getRecordMetadata());
        }

        //发送消息失败回调
        @Override
        public void onFailure(Throwable ex) {
            ex.printStackTrace();
        }
    });

}

示例2

使用注册监听,自定义生产者发送结果监听类,需要实现 ProducerListener类。

  • 发送消息成功则会回调用 onSuccess方法。
  • 发送消息失败则会回调用 onError方法。
@Configuration
public class KafkaListener {
    private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class);

    @Autowired
    KafkaTemplate kafkaTemplate;

    //配置监听
    @PostConstruct
    private void listener() {
        kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
            @Override
            public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
                logger.info("ok,message={}", producerRecord.value());
            }

            @Override
            public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {
                logger.error("error!message={}", producerRecord.value());
            }
        });
    }
}

5、分区策略

分区策略决定了消息根据key投放到哪个分区,也是顺序消费保障的基石。

  • 给定了分区号,直接将数据发送到指定的分区里面去
  • 没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
  • 既没有给定分区号,也没有给定key值,直接轮循进行分区(默认)
  • 自定义分区,你想怎么做就怎么做

5.1 验证默认分区规则

发送者代码参考:PartitionProducer.java

//测试分区发送
@RestController
public class PartitionProducer {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    //    指定分区发送
//    不管你key是什么,到同一个分区
    @GetMapping("/kafka/partitionSend/{key}")
    public void setPartition(@PathVariable("key") String key) {
        kafkaTemplate.send("test", 0, key, "key=" + key + ",msg=指定0号分区");
    }

    //    指定key发送,不指定分区
//    根据key做hash,相同的key到同一个分区
    @GetMapping("/kafka/keysend/{key}")
    public void setKey(@PathVariable("key") String key) {
        kafkaTemplate.send("test", key, "key=" + key + ",msg=不指定分区");
    }

}

消费者代码使用:PartitionConsumer.java

@Component
public class PartitionConsumer {
    private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);

    //分区消费
    @KafkaListener(topics = {"test"},topicPattern = "0")
    public void onMessage(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("partition=0,message:[{}]", msg);
        }
    }
    @KafkaListener(topics = {"test"},topicPattern = "1")
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("partition=1,message:[{}]", msg);
        }
    }
}

访问setKey(也就是只给了key的方法),可以看到key相同的被hash到了同一个分区:

image-1659512598255

再访问setPartition来设置分区号0来发送,可以看到无论key是什么,都是分区0来消费:

image-1659512634127

5.2 自定义分区

public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//        定义自己的分区策略
//                如果key以0开头,发到0号分区
//                其他都扔到1号分区
        String keyStr = key+"";
        if (keyStr.startsWith("0")){
            return 0;
        }else {
            return 1;
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
@Configuration
public class MyPartitionTemplate {
 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    KafkaTemplate kafkaTemplate;

    @PostConstruct
    public void setKafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //注意分区器在这里!!!
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
        this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
    }

    public KafkaTemplate getKafkaTemplate(){
        return kafkaTemplate;
    }
}

发送使用:MyPartitionProducer.java。

//测试自定义分区发送
@RestController
public class MyPartitionProducer {

    @Autowired
    MyPartitionTemplate template;

//    使用0开头和其他任意字母开头的key发送消息
//    看控制台的输出,在哪个分区里?
    @GetMapping("/kafka/myPartitionSend/{key}")
    public void setPartition(@PathVariable("key") String key) {
        template.getKafkaTemplate().send("test", key,"key="+key+",msg=自定义分区策略");
    }
}

发送0开头和非0开头两种key,可以看到0开头的key发送到了分区0,其余的都分发到了分区1:

image-1659512818462

0

评论区