一、Apache Kafka快速入门
Apache Kafka(https://kafka.apache.org)是一个开源分布式事件流平台,设计为处理实时数据提供一个统一、高吞吐、低延迟的平台,其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,被广泛应用于高性能数据管道、流分析、数据集成和关键任务应用程序。主要应用场景是:日志收集、消息系统、用户活动跟踪、运营指标、流式处理等.
kafaka 生产消费模型
1.1 几个主要概念
Topic :Kafka将消息按照主题进行分类,每个主题可以包含多个消息。主题是Kafka中消息的逻辑容器,生产者将消息发布到特定的主题,消费者订阅感兴趣的主题来消费消息。;
Producer :消息生产者,就是向 kafka broker 发消息的客户端;
Consumer:消息消费者,向 kafka broker 取消息的客户端;
Consumer Group(消费者组):多个消费者可以组成一个消费者组,共同消费一个或多个主题的消息。消费者组可以实现消息的并行处理和负载均衡,每个分区只能由同一个消费者组中的一个消费者进行消费,消费者组可以在消费者客服端指定。
Broker :Kafka集群中的服务器,负责存储和处理消息。一个集群可以由多个Broker组成,一个 broker可以容纳多个 topic。
1.2 单机版Kafka安装部署
Kafka需要使用ZooKeeper提供了分布式协调和管理的能力来实现集群的配置管理、Leader选举、Broker注册和发现以及副本管理等功能,帮助Kafka构建一个可靠、高可用的分布式消息系统。
安装过程中可以选择使用Kafka安装介质自带的ZooKeeper,也可以使用独立的ZooKeeper集群。
需要在Kafka的配置文件config/server.properties中设置zookeeper.connect属性即可。
介质下载
从kafaka官网下载安装介质(https://kafka.apache.org/downloads)(这里以kafka_2.12-3.6.0.tgz为例),下载完以后解压截止到某一目录即可。
zk配置启动
在bin目录有启动zk的脚本,可以在config配置文件配置下zk的启动端口等配置。
1
| ./zookeeper-server-start.sh ./../config/zookeeper.properties &
|
kafaka启动
第一个是start.sh位置, 第二个是server.rpoperties的位置,所以确认好路径的正确性。
1
| ./kafka-server-start.sh ./../config/server.properties &
|
关闭kafaka使用如下命令即可
1
| sh kafka-server-stop.sh -daemon ../config/server.properties
|
判断zk和kafaka是否启动成功
1 2
| netstat -tunlp|egrep "(2181|9092)" jps
|
1.3 Kafka使用入门
创建topic
1
| ./kafka-topics.sh --create --bootstrap-server 192.168.137.80:9092 --replication-factor 1 --partitions 1 --topic test
|
bootstrap-server,Broker 的服务地址,为server.properties中配置的listeners=PLAINTEXT://192.168.137.80:9092,,这里可以修改为配置的主机地址。
显示 topic
1
| sh ./bin/kafka-topics.sh --list --bootstrap-server 192.168.137.80:9092
|
查看topic属性
1
| sh kafka-topics.sh --describe --bootstrap-server 192.168.137.80:9092 --topic test
|
发送消息
1
| ./kafka-console-producer.sh --broker-list 192.168.137.80:9092 --topic test
|
消费消息
1
| ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.80:9092 --topic test --from-beginning
|
查看消费者组主具体信息
1
| ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.137.80:9092 --describe --group webGroup11
|
TOPIC:消费主题名称
PARTITION:分区Id
CURRENT-OFFSET: 当前消费偏移量,即消费者当前消费到的消息的偏移量
LOG-END-OFFSET: 分区的最新偏移量,即分区中最新消息的偏移量
LAG:消费者落后于最新偏移量的消息数量,即LAG = LOG-END-OFFSET - CURRENT-OFFSET(最新-当前)
CONSUMER-ID:消费者ID
HOST:消费者所在主机
CLIENT-ID:消费者客户端ID
修改偏移量
1
| ./kafka-consumer-groups.sh --bootstrap-server 192.168.137.80:9092 --group webGroup11 --topic topic_comment --reset-offsets --to-offset 63 --execute
|
二、Spring Boot集成 Apache Kafka
示例的 springboot的版本是:3.1.4。
1 2 3 4 5 6
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.4</version> <relativePath/> </parent>
|
引入kafaka依赖,配置application.yml
1 2 3 4
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12
| spring: application: name: comment-privoder kafka: consumer: bootstrap-servers: 192.168.137.80:9092 auto-offset-reset: earliest producer: bootstrap-servers: 192.168.137.80:9092 value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
|
消息发送
使用KafkaTemplate
来进行消息的发送:
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
发送消息的方法体:
1 2 3 4
| @GetMapping("/send") public void sendMessage(String input) { this.kafkaTemplate.send("topic_comment", input); }
|
消息接收
在方法体中加入@KafkaListener
注解来实现消息的接收:
1 2 3 4
| @KafkaListener(id = "webGroup11", topics = "topic_comment") public void listen(String input) { System.out.printf(input); }
|
其中id为Consumer Group(消费者组)id,可以自定义,消费者组实现了消息的并行处理和负载均衡,一个topic消息只能由同一消费者组中的一个消费者来消费。