程序员在旅途

用这生命中的每一秒,给自己一个不后悔的未来!

0%

Apache Kafka快速入门及Spring Boot集成案例

一、Apache Kafka快速入门

Apache Kafka(https://kafka.apache.org)是一个开源分布式事件流平台,设计为处理实时数据提供一个统一、高吞吐、低延迟的平台,其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,被广泛应用于高性能数据管道、流分析、数据集成和关键任务应用程序。主要应用场景是:日志收集、消息系统、用户活动跟踪、运营指标、流式处理等.

kafaka 生产消费模型

报错截图

1.1 几个主要概念

Topic​​ :Kafka将消息按照主题进行分类,每个主题可以包含多个消息。主题是Kafka中消息的逻辑容器,生产者将消息发布到特定的主题,消费者订阅感兴趣的主题来消费消息。;
​​Producer​​ :消息生产者,就是向 kafka broker 发消息的客户端;
​​Consumer​​:消息消费者,向 kafka broker 取消息的客户端;
Consumer Group(消费者组):多个消费者可以组成一个消费者组,共同消费一个或多个主题的消息。消费者组可以实现消息的并行处理和负载均衡,每个分区只能由同一个消费者组中的一个消费者进行消费,消费者组可以在消费者客服端指定。
Broker​​ :Kafka集群中的服务器,负责存储和处理消息。一个集群可以由多个Broker组成,一个 broker可以容纳多个 topic。

1.2 单机版Kafka安装部署

Kafka需要使用ZooKeeper提供了分布式协调和管理的能力来实现集群的配置管理、Leader选举、Broker注册和发现以及副本管理等功能,帮助Kafka构建一个可靠、高可用的分布式消息系统。
安装过程中可以选择使用Kafka安装介质自带的ZooKeeper,也可以使用独立的ZooKeeper集群。
需要在Kafka的配置文件config/server.properties中设置zookeeper.connect属性即可。

介质下载

从kafaka官网下载安装介质(https://kafka.apache.org/downloads)(这里以kafka_2.12-3.6.0.tgz为例),下载完以后解压截止到某一目录即可。

zk配置启动

在bin目录有启动zk的脚本,可以在config配置文件配置下zk的启动端口等配置。

1
./zookeeper-server-start.sh ./../config/zookeeper.properties &
kafaka启动

第一个是start.sh位置, 第二个是server.rpoperties的位置,所以确认好路径的正确性。

1
./kafka-server-start.sh ./../config/server.properties &
关闭kafaka使用如下命令即可
1
sh kafka-server-stop.sh -daemon ../config/server.properties
判断zk和kafaka是否启动成功
1
2
netstat -tunlp|egrep "(2181|9092)"
jps

1.3 Kafka使用入门

创建topic
1
./kafka-topics.sh --create --bootstrap-server 192.168.137.80:9092 --replication-factor 1 --partitions 1 --topic test

bootstrap-server,Broker​​ 的服务地址,为server.properties中配置的listeners=PLAINTEXT://192.168.137.80:9092,,这里可以修改为配置的主机地址。

显示 topic
1
sh ./bin/kafka-topics.sh --list --bootstrap-server 192.168.137.80:9092
查看topic属性
1
sh kafka-topics.sh --describe --bootstrap-server 192.168.137.80:9092 --topic test
发送消息
1
./kafka-console-producer.sh --broker-list 192.168.137.80:9092 --topic test
消费消息
1
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.80:9092 --topic test --from-beginning
查看消费者组主具体信息
1
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.137.80:9092 --describe --group webGroup11

TOPIC:消费主题名称
PARTITION:分区Id
CURRENT-OFFSET: 当前消费偏移量,即消费者当前消费到的消息的偏移量
LOG-END-OFFSET: 分区的最新偏移量,即分区中最新消息的偏移量
LAG:消费者落后于最新偏移量的消息数量,即LAG = LOG-END-OFFSET - CURRENT-OFFSET(最新-当前)
CONSUMER-ID:消费者ID
HOST:消费者所在主机
CLIENT-ID:消费者客户端ID

修改偏移量
1
./kafka-consumer-groups.sh --bootstrap-server 192.168.137.80:9092 --group webGroup11 --topic topic_comment --reset-offsets --to-offset 63 --execute

二、Spring Boot集成 Apache Kafka

示例的 springboot的版本是:3.1.4。

1
2
3
4
5
6
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
引入kafaka依赖,配置application.yml
1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
spring:
application:
name: comment-privoder
kafka:
consumer:
bootstrap-servers: 192.168.137.80:9092
# 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)
auto-offset-reset: earliest
producer:
bootstrap-servers: 192.168.137.80:9092
# 发送的对象信息变为json格式
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
消息发送

使用KafkaTemplate来进行消息的发送:

@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;

发送消息的方法体:

1
2
3
4
@GetMapping("/send")
public void sendMessage(String input) {
this.kafkaTemplate.send("topic_comment", input); //topic_comment为topic名称,需要提前创建
}
消息接收

在方法体中加入@KafkaListener注解来实现消息的接收:

1
2
3
4
@KafkaListener(id = "webGroup11", topics = "topic_comment")
public void listen(String input) {
System.out.printf(input);
}

其中id为Consumer Group(消费者组)id,可以自定义,消费者组实现了消息的并行处理和负载均衡,一个topic消息只能由同一消费者组中的一个消费者来消费。