Kafka学习笔记

Kafka学习笔记

  • 安装

    1
    brew install kafka
  • 启动

    1
    2
    3
    4
    5
    # 启动zk 脚本位于/usr/local/opt/kafka/bin/
    zookeeper-server-start -daemon /usr/local/etc/kafka/zookeeper.properties

    # 启动kafka 脚本位于/usr/local/opt/kafka/bin/
    kafka-server-start -daemon /usr/local/etc/kafka/server.properties
  • 创建topic与查看主题信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    # 创建topic
    kafka-topics --create --topic quickstart-events --bootstrap-server localhost:9092

    # 2.2以下使用 如下命令,基本已弃用
    kafka-topics --create --topic test-events --zookeeper localhost:2181

    # 参数说明
    # kafka节点,集群用逗号隔开
    --bootstrap-server localhost:9092
    # 副本数
    --replication-factor 1
    # 指定分区
    --partitions 1

    # 查看topic信息
    kafka-topics --describe --topic quickstart-events --bootstrap-server localhost:9092

    # topic信息如下
    Topic: quickstart-events TopicId: ldMRGNAoQGOcN0xwH9iZkQ PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
    Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0

    # 查看主题列表
    kafka-topics --list --bootstrap-server localhost:9092

    # 删除主题,不能直接在zk客户端删除,删除后,通过命令还是查询到主题,而且用下面的命令删除会出现异常
    kafka-topics --bootstrap-server localhost:9092 --topic mytest --delete
  • 生产消息

    1
    2
    3
    4
    5
    6
    7
    # Ctrl+C 取消生产
    ➜ ~ kafka-console-producer --topic quickstart-events --bootstrap-server localhost:9092
    >This is my first event
    >This is my second event


    kafka-console-producer --topic quickstart-events --bootstrap-server 10.60.0.65:6667

    当topic存在多个分区时,同时发送的时候没有指定分区,则根据hash算法计算投递到哪个分区,hash(key)%partitionNum,因此保证消息有序性可以通过相同的key来保证

    也可以通过指定partition保证有序性

  • 生产者同步发送与异步发送

    同步发送,需要等待kafka的ack通知

    异步发送,kafka收到消息后会通过callback通知生产者,==会存在消息丢失==

    • 同步发送的ack配置

      ack=0 无需任何Broker接收到消息,立刻返回ack

      ack=1 多副本的Leader已经收到消息并且数据已经写到本地磁盘,返回ack

      ack=-1 集群中所有的Broker都收到了消息并写入磁盘,返回ack

      同步的副本数据配置:min.insync.replicas(默认是1,推荐配置大于等于2),Leader和Follower都算副本

  • 生产者将消息发送到Broker,并保存到磁盘/usr/local/var/lib/kafka-logs/topic-partition/00000000000000000000.log

  • 消费消息

    1
    2
    3
    4
    5
    6
    # 该命令不会删除消息
    # 从头消费
    kafka-console-consumer --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

    # 从offset+1开始消费
    kafka-console-consumer --topic quickstart-events --bootstrap-server localhost:9092
  • 消费者自动提交和手动提交offset

    1. 自动提交:消息被消费者poll下来就立即提交offset,会丢消息
    2. 手动提交:消费者消费消息时/后,手动提交offset
      • 手动同步提交:提交offset后,需要等到Broker确认后返回ack,才能继续向下执行
      • 手动异步提交:提分offset后,无需等待Broker确认,Broker会通过回调onComplete通知消费者

==前者是老版本的用法,0.8以前的kafka,消费的进度(offset)是写在zk中的,所以consumer需要知道zk的地址。后来的版本都统一由broker管理,所以就用bootstrap-server了。==

单播消息

同一个消费者组中的消费者,只能有一个能接收到同一个topic生产者发送过来的消息

1
kafka-console-consumer --topic quickstart-events --bootstrap-server localhost:9092 --consumer-property group.id=testGroup 

多播消息

不同的消费者组订阅同一个topic,那么不同的消费者组中只有一个消费者能收到消息

1
2
3
kafka-console-consumer --topic quickstart-events --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 

kafka-console-consumer --topic quickstart-events --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2

消费者组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 查看消费者组列表
kafka-consumer-groups --bootstrap-server localhost:9092 --list
# 输出如下
testGroup

# 查看某个消费者组的信息
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testGroup
# 输出如下
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testGroup quickstart-events 0 28 28 0 console-consumer-e63230ea-d567-4a2a-8b9c-eb7e0636d9e4 /127.0.0.1 console-consumer
# 消费者宕掉的输出
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testGroup quickstart-events 0 28 37 9 - - -

# CURRENT-OFFSET 当前已消费的offset位置
# LOG-END-OFFSET 最新消息所在的offset位置
# LAG 有多少条消费未消费

主题和分区

  1. 主题topic的作用是将消息分类,根据不同的场景将消息分类存储
  2. 因为kafka的消息时存储在log文件中,如果不进行分区,文件会越来越大,会导致查询和迁移数据十分困难,分区的作用是为了拆分文件,同时分区还能提高读写的吞吐量

kafka默认主题

kafka会自动创建一个__consumer_offsets的主题,该主题的作用是记录其他主题的offset

  • 默认50个分区(可修改),提高并发
  • 消费者消费消息后会上报offset
  • 提交到哪个分区是根据hash算法实现,hash(consumerGroupId)%__consumer_offsets分区数
  • 提交的内容是,key:consumerGroupId+topic+partition,value:当前offset的值

监控

jconsole

监控到 Lag 越来越大,说明消费者程序变得越来越慢了,至少是追不上生产者程序了

一旦你监测到 Lead 越来越小,甚至是快接近于 0 了,你就一定要小心了,这可能预示着消费者端要丢消息了(因为消息保存有时限,说明消息最近一直未被消费)。

image-20220706193410314

Kafka-eagle

  1. 下载地址:http://download.kafka-eagle.org/

  2. 配置环境变量

    1
    2
    3
    echo 'export KE_HOME=/Users/unclebryan/DevTools/efak-web-2.1.0' >> ~/.zshrc
    echo 'export PATH=$PATH:$KE_HOME/bin' >> ~/.zshrc
    source ~/.zshrc
  3. 修改配置:

    1
    2
    3
    4
    5
    6
    7
    8
    # 修改zk配置
    efak.zk.cluster.alias=cluster1
    cluster1.zk.list=localhost:2181
    #修改mysql配置 数据库会自动创建
    efak.driver=com.mysql.cj.jdbc.Driver
    efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    efak.username=root
    efak.password=123456
  4. 启动

    1
    2
    3
    4
    5
    ke.sh start

    * EFAK Service has started success.
    * Welcome, Now you can visit 'http://192.168.0.172:8048'
    * Account:admin ,Password:123456

配置文件

kafka全局配置 /usr/local/etc/kafka/server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
broker.id=0
# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

# 消息存储目录
log.dirs=/usr/local/var/lib/kafka-logs

# the brokers.
num.partitions=1
num.recovery.threads.per.data.dir=1

# 副本数
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1


# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

# 消息保存的时间限制,超出后会删除
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
# 消息保存的容量限制,超出后会删除
log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

group.initial.rebalance.delay.ms=0

zk配置 zookeeper.properties

1
2
3
4
5
6
7
8
9
10
# the directory where the snapshot is stored.
dataDir=/usr/local/var/lib/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

Kafka整合SpringBoot

==下图的消费者,其实是poll了一批数据,针对的是每一条数据的操作==

image-20220707010746932

开发环境KAFKA

1
2
3
4
5
6
ssh root@10.60.0.65

# 监控
http://10.60.0.65:8048

bootstrap-servers 10.60.0.65:6667,10.60.0.66:6667,10.60.0.67:6667

Kafka学习笔记
http://example.com/2023/03/09/Middleware/MessageQueue/Kafka学习笔记/
作者
UncleBryan
发布于
2023年3月9日
许可协议