Kafka常用集群配置参数

news/2024/12/25 1:43:31 标签: kafka, 分布式

Broker

log.dirs

这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。比如/home/kafka1,/home/kafka2,/home/kafka3这样

log.dirs = /home/kafka1,/home/kafka2,/home/kafka3

如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。

1、可以提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。

2、能够实现故障转移:即 Failover。只要 Kafka Broker 使用的任何一块磁盘挂掉了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。

log.dir :注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。

zookeeper.connect

单个Kafka 集群使用,直接配置为

zookeeper.connect = zk1:2181,zk2:2181,zk3:2181

多个 Kafka 集群使用同一套 ZooKeeper 集群

比如 两套 Kafka 集群分别为kafka1 和 kafka2,那么两套集群的zookeeper.connect参数可以这样指定:
kafka1 中配置:

zookeeper.connect = zk1:2181,zk2:2181,zk3:2181/kafka1

kafka2 中配置:

zookeeper.connect = zk1:2181,zk2:2181,zk3:2181/kafka2

监听器

listeners : 监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。它是若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号>。

注意 :主机名这个设置中到底使用 IP 地址还是主机名。最好全部使用主机名,即 Broker 端和 Client 端应用配置中全部填写主机名。不然可能会发生无法连接的问题

advertised.listeners :和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。

Topic相关

auto.create.topics.enable: 是否允许自动创建 Topic。

最好设置成 false,即不允许自动创建 Topic。在线上环境里面有很多名字稀奇古怪的 Topic,大概都是因为该参数被设置成了 true 的缘故。

unclean.leader.election.enable: 是否允许 Unclean Leader 选举。

关闭 Unclean Leader 选举的。何谓 Unclean? Kafka 有多个副本,每个分区都有多个副本来提供高可用。在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。

那么问题来了,这些副本都有资格竞争 Leader 吗?显然不是,只有保存数据比较多的那些副本才有资格竞选,那些落后进度太多的副本没资格做这件事。即防止非lsr副本当选为Leader副本。

**auto.leader.rebalance.enable:**是否允许定期进行 Leader 选举。

置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中 Leader 选举的最大不同在于,它不是选 Leader,而是换 Leader!比如 Leader A 一直表现得很好,但若auto.leader.rebalance.enable=true,那么有可能一段时间后 Leader A 就要被强行卸任换成 Leader B。

要知道换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此建议你在生产环境中把这个参数设置成 false

数据留存

log.retention.{hours|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低。

log.retention.hours=168表示默认保存 7 天的数据,自动删除 7 天前的数据

log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。这个值默认是 -1,表明你想在这台 Broker 上保存多少数据都可以。

这个参数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集群:设想要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间,为了避免有个“恶意”租户使用过多的磁盘空间,设置这个参数就显得至关重要了。

**message.max.bytes:**控制 Broker 能够接收的最大消息大小。

message.max.bytes 默认的 1000012 太少了,还不到 1MB。

实际场景中突破 1MB 的消息都是很常见的,因此在线上环境中设置一个比较大的值还是比较保险的做法。毕竟它仅仅是衡量 Broker 能够处理的最大消息大小,即使设置大一点也不会耗费什么磁盘空间的。

TOPIC

topic配置优先级

如果同时设置了 Topic 级别参数和全局 Broker 参数,哪个参数生效?

Topic 级别参数会覆盖全局 Broker 参数的值,而每个 Topic 都能设置自己的参数值

Topic 的数据保存时间

retention.ms:Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。

retention.bytes:要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。

max.message.bytes: 决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小。很多公司都把 Kafka 作为一个基础架构组件来运行,上面跑了很多的业务数据。如果在全局层面上,不好给出一个合适的最大消息值,那么不同业务能够自行设定自己业务 Topic 级别参数就显得非常必要了。

创建 Topic 时进行设置

限制5MB,一般消息不会超过 5MB

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bigData --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880

修改 Topic 时设置

使用kafka-configs来修改 Topic 级别参数,修改成限制10MB

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name bigData --alter --add-config max.message.bytes=10485760

JVM 参数

  1. 如果是Jdk8 且有资源配置较高 JVM 堆大小设置成 6GB ,垃圾回收选择G1;
  2. 如果 Broker 所在机器的内存不是很充足,CPU 资源非常空闲,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。
  3. 上面两个都不满足,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。
$> export KAFKA_HEAP_OPTS=--Xms6g  --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties

KAFKA_HEAP_OPTS:指定堆大小。KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。

操作系统参数

文件描述符限制

经常看到“Too many open files”的错误。直接给大点ulimit -n 1000000

  1. 临时设置 ulimit -n ,仅在当前终端会话中生效,重启后失效。
# 设置
ulimit -n 1000000
# 验证设置是否成功
ulimit -n

  1. 永久设置 ulimit -n
    修改 /etc/sysctl.conf
sudo nano /etc/sysctl.conf
#添加内容
fs.file-max = 1000000
# 配置生效
sudo sysctl -p
# 查看
cat /proc/sys/fs/file-max

文件系统类型

指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。XFS 的性能要强于 ext4,所以生产环境性能要求很高,最好还是使用 XFS。

# 查看可用磁盘
1. lsblk 
# 创建分区并格式化为 XFS
2. sudo mkfs.xfs /dev/sdb
# 挂载到 Kafka 数据目录 log.dirs=/data/kafka-logs
3. sudo mkdir -p /data/kafka-logs
sudo mount /dev/sdb /data/kafka-logs
# 验证挂载和文件系统类型
4. df -hT /data/kafka-data
# 配置开机自动挂载并优化: 编辑 /etc/fstab,添加:
#noatime: 禁止访问时间更新,提高性能。
#nodiratime: 禁止目录访问时间更新。
#allocsize=512m: 增大文件的预分配大小,减少碎片化。
5. /dev/sdb  /data/kafka-logs  xfs  
defaults,noatime,nodiratime,allocsize=512m  0  0


Swappiness

将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。如果设置为0,将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。

  1. 临时修改(无需重启)
sudo sysctl vm.swappiness=1
cat /proc/sys/vm/swappiness

  1. 永久修改(重启后生效)
    编辑系统配置文件 /etc/sysctl.conf,添加或修改以下内容:
vm.swappiness=1

让配置生效命令

sudo sysctl -p
cat /proc/sys/vm/swappiness

提交时间(Flush 落盘时间)

向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。

这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。

疑问点: 如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,多写几次内存总比写一次硬盘块而且保险,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。

配置在Kafka Broker 的配置文件 server.properties中:

log.flush.interval.messages=10000       # 设置消息数量触发落盘
log.flush.interval.ms=10000             # 设置时间间隔触发落盘(10秒)
log.flush.scheduler.interval.ms=1000    # 后台检查间隔

如果两个条件都设置(log.flush.interval.messages 和 log.flush.interval.ms),只要满足任意一个条件就会触发落盘。

对性能要求较高的场景,可以适当增加 log.flush.interval.ms 和 log.flush.interval.messages 的值,以减少磁盘 I/O,但可能增加数据丢失的风险。


http://www.niftyadmin.cn/n/5798399.html

相关文章

Linux系统的阻塞方式和非阻塞方式是什么意思?

在Linux系统中&#xff0c;阻塞方式和非阻塞方式是描述进程&#xff08;或线程&#xff09;在执行某些操作&#xff08;如I/O操作&#xff09;时的行为模式。这些模式影响了进程的运行状态以及如何处理资源等待。以下是详细解释&#xff1a; 阻塞方式 阻塞方式是指进程在执行某…

重温设计模式--模板方法模式

文章目录 一、模板方法模式概述二、模板方法模式UML图三、优点1代码复用性高2可维护性好3扩展性强 四、缺点五、使用场景六、C 代码示例1七、 C 代码示例2 一、模板方法模式概述 定义&#xff1a;定义一个操作中的算法骨架&#xff0c;而降一些步骤延迟到子类中。模板方法使得…

R9000P键盘失灵解决办法

问题描述 突然&#xff0c;就是很突然&#xff0c;我买的R9000P 2024不到三个月&#xff0c;键盘突然都不能用了&#xff0c;是所有键盘按键都无效的那种。&#xff08;可以使用外接键盘&#xff09; 解决办法 我本科室友说的好哈&#xff0c;全坏全没坏。 &#xff08;该解…

ssr实现方案

目录 序言 一、流程 二、前端要做的事情 三、节点介绍 四、总结 序言 本文不是详细的实现过程&#xff0c;是让你最快最直接的理解ssr的真正实现方法&#xff0c;有前端经验的同学&#xff0c;能够很好的理解过程&#xff0c;细节根据具体项目实现 一、前端要做的事情 1.…

AWTK-WEB 快速入门(2) - JS 应用程序

AWTK 可以使用相同的技术栈开发各种平台的应用程序。有时我们需要使用 Web 界面与设备进行交互&#xff0c;本文介绍一下如何使用 JS 语言开发 AWTK-WEB 应用程序。 用 AWTK Designer 新建一个应用程序 先安装 AWTK Designer&#xff1a; https://awtk.zlg.cn/web/index.html…

ubuntu开机进入initramfs状态

虚拟机卡死成功起后进入了initramfs状态&#xff0c;可能是跟文件系统有问题或者检索不到根文件系统&#xff0c;或者是配置错误&#xff0c;系统磁盘等硬件问题导致 开机后进入如下图的界面&#xff0c; 文中有一条提示 要手动fsck 命令修复 /dev/sda1 命令如下 fsck /de…

查询Elasticsearch索引刷新间隔

要查询 Elasticsearch 索引的刷新间隔&#xff08;refresh_interval&#xff09;&#xff0c;你可以使用以下方法&#xff1a; 1. 使用 GET 请求查询索引设置 你可以通过 GET 请求获取索引的设置信息&#xff0c;其中包括 refresh_interval 的值。 示例命令 GET /your_inde…

Retrofit源码分析:动态代理获取Api接口实例,解析注解生成request,线程切换

目录 一&#xff0c;Retrofit的基本使用 1.定义api接口 2.创建Retrofit实例 3.获取api接口实例发起请求 二&#xff0c;静态代理和动态代理 1&#xff0c;静态代理 2&#xff0c;动态代理 三&#xff0c;动态代理获取Api接口实例 四&#xff0c;解析接口方法注解&…