Kafka学习笔记
Kafka学习笔记
安装
1
brew install kafka启动
1
2
3
4
5# 启动zk 脚本位于/usr/local/opt/kafka/bin/
zookeeper-server-start -daemon /usr/local/etc/kafka/zookeeper.properties
# 启动kafka 脚本位于/usr/local/opt/kafka/bin/
kafka-server-start -daemon /usr/local/etc/kafka/server.properties创建topic与查看主题信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26# 创建topic
kafka-topics --create --topic quickstart-events --bootstrap-server localhost:9092
# 2.2以下使用 如下命令,基本已弃用
kafka-topics --create --topic test-events --zookeeper localhost:2181
# 参数说明
# kafka节点,集群用逗号隔开
--bootstrap-server localhost:9092
# 副本数
--replication-factor 1
# 指定分区
--partitions 1
# 查看topic信息
kafka-topics --describe --topic quickstart-events --bootstrap-server localhost:9092
# topic信息如下
Topic: quickstart-events TopicId: ldMRGNAoQGOcN0xwH9iZkQ PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
# 查看主题列表
kafka-topics --list --bootstrap-server localhost:9092
# 删除主题,不能直接在zk客户端删除,删除后,通过命令还是查询到主题,而且用下面的命令删除会出现异常
kafka-topics --bootstrap-server localhost:9092 --topic mytest --delete生产消息
1
2
3
4
5
6
7# Ctrl+C 取消生产
➜ ~ kafka-console-producer --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event
kafka-console-producer --topic quickstart-events --bootstrap-server 10.60.0.65:6667当topic存在多个分区时,同时发送的时候没有指定分区,则根据hash算法计算投递到哪个分区,hash(key)%partitionNum,因此保证消息有序性可以通过相同的key来保证
也可以通过指定partition保证有序性
生产者同步发送与异步发送
同步发送,需要等待kafka的ack通知
异步发送,kafka收到消息后会通过callback通知生产者,==会存在消息丢失==
同步发送的ack配置
ack=0 无需任何Broker接收到消息,立刻返回ack
ack=1 多副本的Leader已经收到消息并且数据已经写到本地磁盘,返回ack
ack=-1 集群中所有的Broker都收到了消息并写入磁盘,返回ack
同步的副本数据配置:min.insync.replicas(默认是1,推荐配置大于等于2),Leader和Follower都算副本
生产者将消息发送到Broker,并保存到磁盘
/usr/local/var/lib/kafka-logs/topic-partition/00000000000000000000.log消费消息
1
2
3
4
5
6# 该命令不会删除消息
# 从头消费
kafka-console-consumer --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
# 从offset+1开始消费
kafka-console-consumer --topic quickstart-events --bootstrap-server localhost:9092消费者自动提交和手动提交offset
- 自动提交:消息被消费者poll下来就立即提交offset,会丢消息
- 手动提交:消费者消费消息时/后,手动提交offset
- 手动同步提交:提交offset后,需要等到Broker确认后返回ack,才能继续向下执行
- 手动异步提交:提分offset后,无需等待Broker确认,Broker会通过回调onComplete通知消费者
==前者是老版本的用法,0.8以前的kafka,消费的进度(offset)是写在zk中的,所以consumer需要知道zk的地址。后来的版本都统一由broker管理,所以就用bootstrap-server了。==
单播消息
同一个消费者组中的消费者,只能有一个能接收到同一个topic生产者发送过来的消息
1 | |
多播消息
不同的消费者组订阅同一个topic,那么不同的消费者组中只有一个消费者能收到消息
1 | |
消费者组
1 | |
主题和分区
- 主题topic的作用是将消息分类,根据不同的场景将消息分类存储
- 因为kafka的消息时存储在log文件中,如果不进行分区,文件会越来越大,会导致查询和迁移数据十分困难,分区的作用是为了拆分文件,同时分区还能提高读写的吞吐量
kafka默认主题
kafka会自动创建一个__consumer_offsets的主题,该主题的作用是记录其他主题的offset
- 默认50个分区(可修改),提高并发
- 消费者消费消息后会上报offset
- 提交到哪个分区是根据hash算法实现,hash(consumerGroupId)%__consumer_offsets分区数
- 提交的内容是,key:consumerGroupId+topic+partition,value:当前offset的值
监控
jconsole
监控到 Lag 越来越大,说明消费者程序变得越来越慢了,至少是追不上生产者程序了
一旦你监测到 Lead 越来越小,甚至是快接近于 0 了,你就一定要小心了,这可能预示着消费者端要丢消息了(因为消息保存有时限,说明消息最近一直未被消费)。

Kafka-eagle
配置环境变量
1
2
3echo 'export KE_HOME=/Users/unclebryan/DevTools/efak-web-2.1.0' >> ~/.zshrc
echo 'export PATH=$PATH:$KE_HOME/bin' >> ~/.zshrc
source ~/.zshrc修改配置:
1
2
3
4
5
6
7
8# 修改zk配置
efak.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
#修改mysql配置 数据库会自动创建
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456启动
1
2
3
4
5ke.sh start
* EFAK Service has started success.
* Welcome, Now you can visit 'http://192.168.0.172:8048'
* Account:admin ,Password:123456
配置文件
kafka全局配置 /usr/local/etc/kafka/server.properties
1 | |
zk配置 zookeeper.properties
1 | |
Kafka整合SpringBoot
==下图的消费者,其实是poll了一批数据,针对的是每一条数据的操作==

开发环境KAFKA
1 | |